http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java new file mode 100644 index 0000000..f1bc453 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class BrokerConfigSingleton { + private static AtomicBoolean isInit = new AtomicBoolean(); + private static BrokerConfig brokerConfig; + + public static BrokerConfig getBrokerConfig() { + if (brokerConfig == null) { + throw new IllegalArgumentException("brokerConfig Cannot be null !"); + } + return brokerConfig; + } + + public static void setBrokerConfig(BrokerConfig brokerConfig) { + if (!isInit.compareAndSet(false, true)) { + throw new IllegalArgumentException("broker config have inited !"); + } + BrokerConfigSingleton.brokerConfig = brokerConfig; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java new file mode 100644 index 0000000..3191509 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + */ +public abstract class ConfigManager { + private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + + + public abstract String encode(); + + public boolean load() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + + if (null == jsonString || jsonString.length() == 0) { + return this.loadBak(); + } else { + this.decode(jsonString); + PLOG.info("load {} OK", fileName); + return true; + } + } catch (Exception e) { + PLOG.error("load " + fileName + " Failed, and try to load backup file", e); + return this.loadBak(); + } + } + + public abstract String configFilePath(); + + private boolean loadBak() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName + ".bak"); + if (jsonString != null && jsonString.length() > 0) { + this.decode(jsonString); + PLOG.info("load " + fileName + " OK"); + return true; + } + } catch (Exception e) { + PLOG.error("load " + fileName + " Failed", e); + return false; + } + + return true; + } + + public abstract void decode(final String jsonString); + + public synchronized void persist() { + String jsonString = this.encode(true); + if (jsonString != null) { + String fileName = this.configFilePath(); + try { + MixAll.string2File(jsonString, fileName); + } catch (IOException e) { + PLOG.error("persist file Exception, " + fileName, e); + } + } + } + + public abstract String encode(final boolean prettyFormat); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java new file mode 100644 index 0000000..8b69c1f --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author xigu.lx + */ +public class Configuration { + + private final Logger log; + + private List<Object> configObjectList = new ArrayList<Object>(4); + private String storePath; + private boolean storePathFromConfig = false; + private Object storePathObject; + private Field storePathField; + private DataVersion dataVersion = new DataVersion(); + private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + /** + * All properties include configs in object and extend properties. + */ + private Properties allConfigs = new Properties(); + + public Configuration(Logger log) { + this.log = log; + } + + public Configuration(Logger log, Object... configObjects) { + this.log = log; + if (configObjects == null || configObjects.length == 0) { + return; + } + for (Object configObject : configObjects) { + registerConfig(configObject); + } + } + + public Configuration(Logger log, String storePath, Object... configObjects) { + this(log, configObjects); + this.storePath = storePath; + } + + /** + * register config object + * + * @param configObject + * @return the current Configuration object + */ + public Configuration registerConfig(Object configObject) { + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + + Properties registerProps = MixAll.object2Properties(configObject); + + merge(registerProps, this.allConfigs); + + configObjectList.add(configObject); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("registerConfig lock error"); + } + return this; + } + + /** + * register config properties + * + * @param extProperties + * @return the current Configuration object + */ + public Configuration registerConfig(Properties extProperties) { + if (extProperties == null) { + return this; + } + + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + merge(extProperties, this.allConfigs); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("register lock error. {}" + extProperties); + } + + return this; + } + + /** + * The store path will be gotten from the field of object. + * + * @param object + * @param fieldName + * + * @throws java.lang.RuntimeException if the field of object is not exist. + */ + public void setStorePathFromConfig(Object object, String fieldName) { + assert object != null; + + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + this.storePathFromConfig = true; + this.storePathObject = object; + // check + this.storePathField = object.getClass().getDeclaredField(fieldName); + assert this.storePathField != null + && !Modifier.isStatic(this.storePathField.getModifiers()); + this.storePathField.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("setStorePathFromConfig lock error"); + } + } + + private String getStorePath() { + String realStorePath = null; + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + realStorePath = this.storePath; + + if (this.storePathFromConfig) { + try { + realStorePath = (String) storePathField.get(this.storePathObject); + } catch (IllegalAccessException e) { + log.error("getStorePath error, ", e); + } + } + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getStorePath lock error"); + } + + return realStorePath; + } + + public void update(Properties properties) { + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + // the property must be exist when update + mergeIfExist(properties, this.allConfigs); + + for (Object configObject : configObjectList) { + // not allConfigs to update... + MixAll.properties2Object(properties, configObject); + } + + this.dataVersion.nextVersion(); + + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("update lock error, {}", properties); + return; + } + + persist(); + } + + public void persist() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + String allConfigs = getAllConfigsInternal(); + + MixAll.string2File(allConfigs, getStorePath()); + } catch (IOException e) { + log.error("persist string2File error, ", e); + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("persist lock error"); + } + } + + public String getAllConfigsFormatString() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + + return getAllConfigsInternal(); + + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getAllConfigsFormatString lock error"); + } + + return null; + } + + public String getDataVersionJson() { + return this.dataVersion.toJson(); + } + + public Properties getAllConfigs() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + + return this.allConfigs; + + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getAllConfigs lock error"); + } + + return null; + } + + private String getAllConfigsInternal() { + StringBuilder stringBuilder = new StringBuilder(); + + // reload from config object ? + for (Object configObject : this.configObjectList) { + Properties properties = MixAll.object2Properties(configObject); + if (properties != null) { + merge(properties, this.allConfigs); + } else { + log.warn("getAllConfigsInternal object2Properties is null, {}", configObject.getClass()); + } + } + + { + stringBuilder.append(MixAll.properties2String(this.allConfigs)); + } + + return stringBuilder.toString(); + } + + public void setStorePath(final String storePath) { + this.storePath = storePath; + } + + private void merge(Properties from, Properties to) { + for (Object key : from.keySet()) { + Object fromObj = from.get(key), toObj = to.get(key); + if (toObj != null && !toObj.equals(fromObj)) { + log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj); + } + to.put(key, fromObj); + } + } + + private void mergeIfExist(Properties from, Properties to) { + for (Object key : from.keySet()) { + if (!to.containsKey(key)) { + continue; + } + + Object fromObj = from.get(key), toObj = to.get(key); + if (toObj != null && !toObj.equals(fromObj)) { + log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj); + } + to.put(key, fromObj); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java new file mode 100644 index 0000000..a5cc9a1 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * Add reset feature for @see java.util.concurrent.CountDownLatch + * + * @author xinyuzhou.zxy + */ +public class CountDownLatch { + /** + * Synchronization control For CountDownLatch. + * Uses AQS state to represent count. + */ + private static final class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 4982264981922014374L; + + private final int startCount; + + Sync(int count) { + this.startCount = count; + setState(count); + } + + int getCount() { + return getState(); + } + + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; + } + + protected boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (;;) { + int c = getState(); + if (c == 0) + return false; + int nextc = c - 1; + if (compareAndSetState(c, nextc)) + return nextc == 0; + } + } + + protected void reset() { + setState(startCount); + } + } + + private final Sync sync; + + /** + * Constructs a {@code CountDownLatch} initialized with the given count. + * + * @param count + * the number of times {@link #countDown} must be invoked + * before threads can pass through {@link #await} + * + * @throws IllegalArgumentException + * if {@code count} is negative + */ + public CountDownLatch(int count) { + if (count < 0) throw new IllegalArgumentException("count < 0"); + this.sync = new Sync(count); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. + * + * <p>If the current count is zero then this method returns immediately. + * + * <p>If the current count is greater than zero then the current + * thread becomes disabled for thread scheduling purposes and lies + * dormant until one of two things happen: + * <ul> + * <li>The count reaches zero due to invocations of the + * {@link #countDown} method; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + * </ul> + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting, + * </ul> + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @throws InterruptedException + * if the current thread is interrupted + * while waiting + */ + public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, + * or the specified waiting time elapses. + * + * <p>If the current count is zero then this method returns immediately + * with the value {@code true}. + * + * <p>If the current count is greater than zero then the current + * thread becomes disabled for thread scheduling purposes and lies + * dormant until one of three things happen: + * <ul> + * <li>The count reaches zero due to invocations of the + * {@link #countDown} method; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * <li>The specified waiting time elapses. + * </ul> + * + * <p>If the count reaches zero then the method returns with the + * value {@code true}. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting, + * </ul> + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the {@code timeout} argument + * + * @return {@code true} if the count reached zero and {@code false} + * if the waiting time elapsed before the count reached zero + * + * @throws InterruptedException + * if the current thread is interrupted + * while waiting + */ + public boolean await(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Decrements the count of the latch, releasing all waiting threads if + * the count reaches zero. + * + * <p>If the current count is greater than zero then it is decremented. + * If the new count is zero then all waiting threads are re-enabled for + * thread scheduling purposes. + * + * <p>If the current count equals zero then nothing happens. + */ + public void countDown() { + sync.releaseShared(1); + } + + /** + * Returns the current count. + * + * <p>This method is typically used for debugging and testing purposes. + * + * @return the current count + */ + public long getCount() { + return sync.getCount(); + } + + public void reset() { + sync.reset(); + } + + /** + * Returns a string identifying this latch, as well as its state. + * The state, in brackets, includes the String {@code "Count ="} + * followed by the current count. + * + * @return a string identifying this latch, as well as its state + */ + public String toString() { + return super.toString() + "[Count = " + sync.getCount() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java new file mode 100644 index 0000000..eb78ba1 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class DataVersion extends RemotingSerializable { + private long timestatmp = System.currentTimeMillis(); + private AtomicLong counter = new AtomicLong(0); + + + public void assignNewOne(final DataVersion dataVersion) { + this.timestatmp = dataVersion.timestatmp; + this.counter.set(dataVersion.counter.get()); + } + + + public void nextVersion() { + this.timestatmp = System.currentTimeMillis(); + this.counter.incrementAndGet(); + } + + + public long getTimestatmp() { + return timestatmp; + } + + + public void setTimestatmp(long timestatmp) { + this.timestatmp = timestatmp; + } + + + public AtomicLong getCounter() { + return counter; + } + + + public void setCounter(AtomicLong counter) { + this.counter = counter; + } + + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final DataVersion that = (DataVersion) o; + + if (timestatmp != that.timestatmp) return false; + return counter != null ? counter.equals(that.counter) : that.counter == null; + + } + + @Override + public int hashCode() { + int result = (int) (timestatmp ^ (timestatmp >>> 32)); + result = 31 * result + (counter != null ? counter.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java new file mode 100644 index 0000000..19afb09 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +/** + * @author shijia.wxr + */ +public class MQVersion { + + public static final int CURRENT_VERSION = Version.V4_0_0_SNAPSHOT.ordinal(); + + + public static String getVersionDesc(int value) { + try { + Version v = Version.values()[value]; + return v.name(); + } catch (Exception e) { + } + + return "HigherVersion"; + } + + + public static Version value2Version(int value) { + return Version.values()[value]; + } + + public enum Version { + V3_0_0_SNAPSHOT, + V3_0_0_ALPHA1, + V3_0_0_BETA1, + V3_0_0_BETA2, + V3_0_0_BETA3, + V3_0_0_BETA4, + V3_0_0_BETA5, + V3_0_0_BETA6_SNAPSHOT, + V3_0_0_BETA6, + V3_0_0_BETA7_SNAPSHOT, + V3_0_0_BETA7, + V3_0_0_BETA8_SNAPSHOT, + V3_0_0_BETA8, + V3_0_0_BETA9_SNAPSHOT, + V3_0_0_BETA9, + V3_0_0_FINAL, + V3_0_1_SNAPSHOT, + V3_0_1, + V3_0_2_SNAPSHOT, + V3_0_2, + V3_0_3_SNAPSHOT, + V3_0_3, + V3_0_4_SNAPSHOT, + V3_0_4, + V3_0_5_SNAPSHOT, + V3_0_5, + V3_0_6_SNAPSHOT, + V3_0_6, + V3_0_7_SNAPSHOT, + V3_0_7, + V3_0_8_SNAPSHOT, + V3_0_8, + V3_0_9_SNAPSHOT, + V3_0_9, + + V3_0_10_SNAPSHOT, + V3_0_10, + + V3_0_11_SNAPSHOT, + V3_0_11, + + V3_0_12_SNAPSHOT, + V3_0_12, + + V3_0_13_SNAPSHOT, + V3_0_13, + + V3_0_14_SNAPSHOT, + V3_0_14, + + V3_0_15_SNAPSHOT, + V3_0_15, + + V3_1_0_SNAPSHOT, + V3_1_0, + + V3_1_1_SNAPSHOT, + V3_1_1, + + V3_1_2_SNAPSHOT, + V3_1_2, + + V3_1_3_SNAPSHOT, + V3_1_3, + + V3_1_4_SNAPSHOT, + V3_1_4, + + V3_1_5_SNAPSHOT, + V3_1_5, + + V3_1_6_SNAPSHOT, + V3_1_6, + + V3_1_7_SNAPSHOT, + V3_1_7, + + V3_1_8_SNAPSHOT, + V3_1_8, + + V3_1_9_SNAPSHOT, + V3_1_9, + + V3_2_0_SNAPSHOT, + V3_2_0, + + V3_2_1_SNAPSHOT, + V3_2_1, + + V3_2_2_SNAPSHOT, + V3_2_2, + + V3_2_3_SNAPSHOT, + V3_2_3, + + V3_2_4_SNAPSHOT, + V3_2_4, + + V3_2_5_SNAPSHOT, + V3_2_5, + + V3_2_6_SNAPSHOT, + V3_2_6, + + V3_2_7_SNAPSHOT, + V3_2_7, + + V3_2_8_SNAPSHOT, + V3_2_8, + + V3_2_9_SNAPSHOT, + V3_2_9, + + V3_3_1_SNAPSHOT, + V3_3_1, + + V3_3_2_SNAPSHOT, + V3_3_2, + + V3_3_3_SNAPSHOT, + V3_3_3, + + V3_3_4_SNAPSHOT, + V3_3_4, + + V3_3_5_SNAPSHOT, + V3_3_5, + + V3_3_6_SNAPSHOT, + V3_3_6, + + V3_3_7_SNAPSHOT, + V3_3_7, + + V3_3_8_SNAPSHOT, + V3_3_8, + + V3_3_9_SNAPSHOT, + V3_3_9, + + V3_4_1_SNAPSHOT, + V3_4_1, + + V3_4_2_SNAPSHOT, + V3_4_2, + + V3_4_3_SNAPSHOT, + V3_4_3, + + V3_4_4_SNAPSHOT, + V3_4_4, + + V3_4_5_SNAPSHOT, + V3_4_5, + + V3_4_6_SNAPSHOT, + V3_4_6, + + V3_4_7_SNAPSHOT, + V3_4_7, + + V3_4_8_SNAPSHOT, + V3_4_8, + + V3_4_9_SNAPSHOT, + V3_4_9, + V3_5_1_SNAPSHOT, + V3_5_1, + + V3_5_2_SNAPSHOT, + V3_5_2, + + V3_5_3_SNAPSHOT, + V3_5_3, + + V3_5_4_SNAPSHOT, + V3_5_4, + + V3_5_5_SNAPSHOT, + V3_5_5, + + V3_5_6_SNAPSHOT, + V3_5_6, + + V3_5_7_SNAPSHOT, + V3_5_7, + + V3_5_8_SNAPSHOT, + V3_5_8, + + V3_5_9_SNAPSHOT, + V3_5_9, + + V3_6_1_SNAPSHOT, + V3_6_1, + + V3_6_2_SNAPSHOT, + V3_6_2, + + V3_6_3_SNAPSHOT, + V3_6_3, + + V3_6_4_SNAPSHOT, + V3_6_4, + + V3_6_5_SNAPSHOT, + V3_6_5, + + V3_6_6_SNAPSHOT, + V3_6_6, + + V3_6_7_SNAPSHOT, + V3_6_7, + + V3_6_8_SNAPSHOT, + V3_6_8, + + V3_6_9_SNAPSHOT, + V3_6_9, + + V3_7_1_SNAPSHOT, + V3_7_1, + + V3_7_2_SNAPSHOT, + V3_7_2, + + V3_7_3_SNAPSHOT, + V3_7_3, + + V3_7_4_SNAPSHOT, + V3_7_4, + + V3_7_5_SNAPSHOT, + V3_7_5, + + V3_7_6_SNAPSHOT, + V3_7_6, + + V3_7_7_SNAPSHOT, + V3_7_7, + + V3_7_8_SNAPSHOT, + V3_7_8, + + V3_7_9_SNAPSHOT, + V3_7_9, + + V3_8_1_SNAPSHOT, + V3_8_1, + + V3_8_2_SNAPSHOT, + V3_8_2, + + V3_8_3_SNAPSHOT, + V3_8_3, + + V3_8_4_SNAPSHOT, + V3_8_4, + + V3_8_5_SNAPSHOT, + V3_8_5, + + V3_8_6_SNAPSHOT, + V3_8_6, + + V3_8_7_SNAPSHOT, + V3_8_7, + + V3_8_8_SNAPSHOT, + V3_8_8, + + V3_8_9_SNAPSHOT, + V3_8_9, + + V3_9_1_SNAPSHOT, + V3_9_1, + + V3_9_2_SNAPSHOT, + V3_9_2, + + V3_9_3_SNAPSHOT, + V3_9_3, + + V3_9_4_SNAPSHOT, + V3_9_4, + + V3_9_5_SNAPSHOT, + V3_9_5, + + V3_9_6_SNAPSHOT, + V3_9_6, + + V3_9_7_SNAPSHOT, + V3_9_7, + + V3_9_8_SNAPSHOT, + V3_9_8, + + V3_9_9_SNAPSHOT, + V3_9_9, + + V4_0_0_SNAPSHOT, + V4_0_0, + + V4_1_0_SNAPSHOT, + V4_1_0, + + V4_2_0_SNAPSHOT, + V4_2_0, + + V4_3_0_SNAPSHOT, + V4_3_0, + + V4_4_0_SNAPSHOT, + V4_4_0, + + V4_5_0_SNAPSHOT, + V4_5_0, + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java new file mode 100644 index 0000000..508111c --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.common.annotation.ImportantField; +import com.alibaba.rocketmq.common.help.FAQUrl; +import org.slf4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class MixAll { + public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; + public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"; + public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; + public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr"; + public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel"; + public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net"); + public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); + // http://jmenv.tbsite.net:8080/rocketmq/nsaddr + public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; + public static final String DEFAULT_TOPIC = "TBW102"; + public static final String BENCHMARK_TOPIC = "BenchmarkTest"; + public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; + public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; + public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; + public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER"; + public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER"; + public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; + public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; + public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; + public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; + public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; + public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"; + public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"; + public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; + public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL"; + public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_"; + + public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress(); + public static final String LOCALHOST = localhost(); + public static final String DEFAULT_CHARSET = "UTF-8"; + public static final long MASTER_ID = 0L; + public static final long CURRENT_JVM_PID = getPID(); + + public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; + + public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; + public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; + public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; + public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; + public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; + + public static String getRetryTopic(final String consumerGroup) { + return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; + } + + + public static boolean isSysConsumerGroup(final String consumerGroup) { + return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); + } + + public static boolean isSystemTopic(final String topic) { + return topic.startsWith(SYSTEM_TOPIC_PREFIX); + } + + public static String getDLQTopic(final String consumerGroup) { + return DLQ_GROUP_TOPIC_PREFIX + consumerGroup; + } + + + public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) { + if (isChange) { + String[] ipAndPort = brokerAddr.split(":"); + String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2); + return brokerAddrNew; + } else { + return brokerAddr; + } + } + + + public static long getPID() { + String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); + if (processName != null && processName.length() > 0) { + try { + return Long.parseLong(processName.split("@")[0]); + } catch (Exception e) { + return 0; + } + } + + return 0; + } + + + public static long createBrokerId(final String ip, final int port) { + InetSocketAddress isa = new InetSocketAddress(ip, port); + byte[] ipArray = isa.getAddress().getAddress(); + ByteBuffer bb = ByteBuffer.allocate(8); + bb.put(ipArray); + bb.putInt(port); + long value = bb.getLong(0); + return Math.abs(value); + } + + public static final void string2File(final String str, final String fileName) throws IOException { + + String tmpFile = fileName + ".tmp"; + string2FileNotSafe(str, tmpFile); + + + String bakFile = fileName + ".bak"; + String prevContent = file2String(fileName); + if (prevContent != null) { + string2FileNotSafe(prevContent, bakFile); + } + + + File file = new File(fileName); + file.delete(); + + + file = new File(tmpFile); + file.renameTo(new File(fileName)); + } + + + public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { + File file = new File(fileName); + File fileParent = file.getParentFile(); + if (fileParent != null) { + fileParent.mkdirs(); + } + FileWriter fileWriter = null; + + try { + fileWriter = new FileWriter(file); + fileWriter.write(str); + } catch (IOException e) { + throw e; + } finally { + if (fileWriter != null) { + try { + fileWriter.close(); + } catch (IOException e) { + throw e; + } + } + } + } + + + public static final String file2String(final String fileName) { + File file = new File(fileName); + return file2String(file); + } + + public static final String file2String(final File file) { + if (file.exists()) { + char[] data = new char[(int) file.length()]; + boolean result = false; + + FileReader fileReader = null; + try { + fileReader = new FileReader(file); + int len = fileReader.read(data); + result = len == data.length; + } catch (IOException e) { + // e.printStackTrace(); + } finally { + if (fileReader != null) { + try { + fileReader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + if (result) { + return new String(data); + } + } + return null; + } + + public static final String file2String(final URL url) { + InputStream in = null; + try { + URLConnection urlConnection = url.openConnection(); + urlConnection.setUseCaches(false); + in = urlConnection.getInputStream(); + int len = in.available(); + byte[] data = new byte[len]; + in.read(data, 0, len); + return new String(data, "UTF-8"); + } catch (Exception e) { + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + } + } + } + + return null; + } + + public static String findClassPath(Class<?> c) { + URL url = c.getProtectionDomain().getCodeSource().getLocation(); + return url.getPath(); + } + + + public static void printObjectProperties(final Logger log, final Object object) { + printObjectProperties(log, object, false); + } + + + public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) { + Field[] fields = object.getClass().getDeclaredFields(); + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String name = field.getName(); + if (!name.startsWith("this")) { + Object value = null; + try { + field.setAccessible(true); + value = field.get(object); + if (null == value) { + value = ""; + } + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + if (onlyImportantField) { + Annotation annotation = field.getAnnotation(ImportantField.class); + if (null == annotation) { + continue; + } + } + + if (log != null) { + log.info(name + "=" + value); + } else { + } + } + } + } + } + + + public static String properties2String(final Properties properties) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + if (entry.getValue() != null) { + sb.append(entry.getKey().toString() + "=" + entry.getValue().toString() + "\n"); + } + } + return sb.toString(); + } + + public static Properties string2Properties(final String str) { + Properties properties = new Properties(); + try { + InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET)); + properties.load(in); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return null; + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + return properties; + } + + public static Properties object2Properties(final Object object) { + Properties properties = new Properties(); + + Field[] fields = object.getClass().getDeclaredFields(); + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String name = field.getName(); + if (!name.startsWith("this")) { + Object value = null; + try { + field.setAccessible(true); + value = field.get(object); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + if (value != null) { + properties.setProperty(name, value.toString()); + } + } + } + } + + return properties; + } + + public static void properties2Object(final Properties p, final Object object) { + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getProperty(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg = null; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, new Object[]{arg}); + } + } + } catch (Throwable e) { + } + } + } + } + + + public static boolean isPropertiesEqual(final Properties p1, final Properties p2) { + return p1.equals(p2); + } + + + public static List<String> getLocalInetAddress() { + List<String> inetAddressList = new ArrayList<String>(); + try { + Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces(); + while (enumeration.hasMoreElements()) { + NetworkInterface networkInterface = enumeration.nextElement(); + Enumeration<InetAddress> addrs = networkInterface.getInetAddresses(); + while (addrs.hasMoreElements()) { + inetAddressList.add(addrs.nextElement().getHostAddress()); + } + } + } catch (SocketException e) { + throw new RuntimeException("get local inet address fail", e); + } + + return inetAddressList; + } + + + public static boolean isLocalAddr(String address) { + for (String addr : LOCAL_INET_ADDRESS) { + if (address.contains(addr)) + return true; + } + return false; + } + + + private static String localhost() { + try { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostAddress(); + } catch (Throwable e) { + throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); + } + } + + + public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { + long prev = target.get(); + while (value > prev) { + boolean updated = target.compareAndSet(prev, value); + if (updated) + return true; + + prev = target.get(); + } + + return false; + } + + public static String localhostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (Throwable e) { + throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); + } + } + + public Set<String> list2Set(List<String> values) { + Set<String> result = new HashSet<String>(); + for (String v : values) { + result.add(v); + } + return result; + } + + public List<String> set2List(Set<String> values) { + List<String> result = new ArrayList<String>(); + for (String v : values) { + result.add(v); + } + return result; + } + + public static String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java new file mode 100644 index 0000000..ada6144 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +/** + * @author shijia.wxr + */ +public class Pair<T1, T2> { + private T1 object1; + private T2 object2; + + + public Pair(T1 object1, T2 object2) { + this.object1 = object1; + this.object2 = object2; + } + + + public T1 getObject1() { + return object1; + } + + + public void setObject1(T1 object1) { + this.object1 = object1; + } + + + public T2 getObject2() { + return object2; + } + + + public void setObject2(T2 object2) { + this.object2 = object2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java new file mode 100644 index 0000000..a580cf4 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +/** + * @author shijia.wxr + */ +public enum ServiceState { + /** + * Service just created,not start + */ + CREATE_JUST, + /** + * Service Running + */ + RUNNING, + /** + * Service shutdown + */ + SHUTDOWN_ALREADY, + /** + * Service Start failure + */ + START_FAILED; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java new file mode 100644 index 0000000..d6da0e3 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author shijia.wxr + * @author xinyuzhou.zxy + */ +public abstract class ServiceThread implements Runnable { + private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final long JOIN_TIME = 90 * 1000; + + protected final Thread thread; + + protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); + + protected volatile boolean stopped = false; + + protected final CountDownLatch waitPoint = new CountDownLatch(1); + + + public ServiceThread() { + this.thread = new Thread(this, this.getServiceName()); + } + + + public abstract String getServiceName(); + + + public void start() { + this.thread.start(); + } + + + public void shutdown() { + this.shutdown(false); + } + + public void shutdown(final boolean interrupt) { + this.stopped = true; + STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + + try { + if (interrupt) { + this.thread.interrupt(); + } + + long beginTime = System.currentTimeMillis(); + if (!this.thread.isDaemon()) { + this.thread.join(this.getJointime()); + } + long eclipseTime = System.currentTimeMillis() - beginTime; + STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + + this.getJointime()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public long getJointime() { + return JOIN_TIME; + } + + public void stop() { + this.stop(false); + } + + public void stop(final boolean interrupt) { + this.stopped = true; + STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + + if (interrupt) { + this.thread.interrupt(); + } + } + + public void makeStop() { + this.stopped = true; + STLOG.info("makestop thread " + this.getServiceName()); + } + + public void wakeup() { + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + } + + protected void waitForRunning(long interval) { + if (hasNotified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } + + //entry to wait + waitPoint.reset(); + + try { + waitPoint.await(interval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + hasNotified.set(false); + this.onWaitEnd(); + } + } + + protected void onWaitEnd() { + } + + public boolean isStopped() { + return stopped; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java new file mode 100644 index 0000000..36c0448 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +/** + * @author vintage.wang + */ +public class SystemClock { + public long now() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java new file mode 100644 index 0000000..b4d85cd --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + + +public class ThreadFactoryImpl implements ThreadFactory { + private final AtomicLong threadIndex = new AtomicLong(0); + private final String threadNamePrefix; + + + public ThreadFactoryImpl(final String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + } + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java new file mode 100644 index 0000000..16019df --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.common.constant.PermName; + + +/** + * @author shijia.wxr + */ +public class TopicConfig { + private static final String SEPARATOR = " "; + public static int defaultReadQueueNums = 16; + public static int defaultWriteQueueNums = 16; + private String topicName; + private int readQueueNums = defaultReadQueueNums; + private int writeQueueNums = defaultWriteQueueNums; + private int perm = PermName.PERM_READ | PermName.PERM_WRITE; + private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; + private int topicSysFlag = 0; + private boolean order = false; + + + public TopicConfig() { + } + + + public TopicConfig(String topicName) { + this.topicName = topicName; + } + + + public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) { + this.topicName = topicName; + this.readQueueNums = readQueueNums; + this.writeQueueNums = writeQueueNums; + this.perm = perm; + } + + + public String encode() { + StringBuilder sb = new StringBuilder(); + + // 1 + sb.append(this.topicName); + sb.append(SEPARATOR); + + // 2 + sb.append(this.readQueueNums); + sb.append(SEPARATOR); + + // 3 + sb.append(this.writeQueueNums); + sb.append(SEPARATOR); + + // 4 + sb.append(this.perm); + sb.append(SEPARATOR); + + // 5 + sb.append(this.topicFilterType); + + return sb.toString(); + } + + + public boolean decode(final String in) { + String[] strs = in.split(SEPARATOR); + if (strs != null && strs.length == 5) { + this.topicName = strs[0]; + + this.readQueueNums = Integer.parseInt(strs[1]); + + this.writeQueueNums = Integer.parseInt(strs[2]); + + this.perm = Integer.parseInt(strs[3]); + + this.topicFilterType = TopicFilterType.valueOf(strs[4]); + + return true; + } + + return false; + } + + + public String getTopicName() { + return topicName; + } + + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + + public int getReadQueueNums() { + return readQueueNums; + } + + + public void setReadQueueNums(int readQueueNums) { + this.readQueueNums = readQueueNums; + } + + + public int getWriteQueueNums() { + return writeQueueNums; + } + + + public void setWriteQueueNums(int writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + + public int getPerm() { + return perm; + } + + + public void setPerm(int perm) { + this.perm = perm; + } + + + public TopicFilterType getTopicFilterType() { + return topicFilterType; + } + + + public void setTopicFilterType(TopicFilterType topicFilterType) { + this.topicFilterType = topicFilterType; + } + + + public int getTopicSysFlag() { + return topicSysFlag; + } + + + public void setTopicSysFlag(int topicSysFlag) { + this.topicSysFlag = topicSysFlag; + } + + + public boolean isOrder() { + return order; + } + + + public void setOrder(boolean isOrder) { + this.order = isOrder; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final TopicConfig that = (TopicConfig) o; + + if (readQueueNums != that.readQueueNums) return false; + if (writeQueueNums != that.writeQueueNums) return false; + if (perm != that.perm) return false; + if (topicSysFlag != that.topicSysFlag) return false; + if (order != that.order) return false; + if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false; + return topicFilterType == that.topicFilterType; + + } + + @Override + public int hashCode() { + int result = topicName != null ? topicName.hashCode() : 0; + result = 31 * result + readQueueNums; + result = 31 * result + writeQueueNums; + result = 31 * result + perm; + result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0); + result = 31 * result + topicSysFlag; + result = 31 * result + (order ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums + + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm) + + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + + order + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java new file mode 100644 index 0000000..7a20dc9 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +/** + * @author shijia.wxr + */ +public enum TopicFilterType { + SINGLE_TAG, + MULTI_TAG +}
