http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java new file mode 100644 index 0000000..c9303b7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + */ +public abstract class ConfigManager { + private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + + + public abstract String encode(); + + public boolean load() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + + if (null == jsonString || jsonString.length() == 0) { + return this.loadBak(); + } else { + this.decode(jsonString); + PLOG.info("load {} OK", fileName); + return true; + } + } catch (Exception e) { + PLOG.error("load " + fileName + " Failed, and try to load backup file", e); + return this.loadBak(); + } + } + + public abstract String configFilePath(); + + private boolean loadBak() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName + ".bak"); + if (jsonString != null && jsonString.length() > 0) { + this.decode(jsonString); + PLOG.info("load " + fileName + " OK"); + return true; + } + } catch (Exception e) { + PLOG.error("load " + fileName + " Failed", e); + return false; + } + + return true; + } + + public abstract void decode(final String jsonString); + + public synchronized void persist() { + String jsonString = this.encode(true); + if (jsonString != null) { + String fileName = this.configFilePath(); + try { + MixAll.string2File(jsonString, fileName); + } catch (IOException e) { + PLOG.error("persist file Exception, " + fileName, e); + } + } + } + + public abstract String encode(final boolean prettyFormat); +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/Configuration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/Configuration.java b/common/src/main/java/org/apache/rocketmq/common/Configuration.java new file mode 100644 index 0000000..0ab7c0d --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/Configuration.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author xigu.lx + */ +public class Configuration { + + private final Logger log; + + private List<Object> configObjectList = new ArrayList<Object>(4); + private String storePath; + private boolean storePathFromConfig = false; + private Object storePathObject; + private Field storePathField; + private DataVersion dataVersion = new DataVersion(); + private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + /** + * All properties include configs in object and extend properties. + */ + private Properties allConfigs = new Properties(); + + public Configuration(Logger log) { + this.log = log; + } + + public Configuration(Logger log, Object... configObjects) { + this.log = log; + if (configObjects == null || configObjects.length == 0) { + return; + } + for (Object configObject : configObjects) { + registerConfig(configObject); + } + } + + public Configuration(Logger log, String storePath, Object... configObjects) { + this(log, configObjects); + this.storePath = storePath; + } + + /** + * register config object + * + * @param configObject + * @return the current Configuration object + */ + public Configuration registerConfig(Object configObject) { + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + + Properties registerProps = MixAll.object2Properties(configObject); + + merge(registerProps, this.allConfigs); + + configObjectList.add(configObject); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("registerConfig lock error"); + } + return this; + } + + /** + * register config properties + * + * @param extProperties + * @return the current Configuration object + */ + public Configuration registerConfig(Properties extProperties) { + if (extProperties == null) { + return this; + } + + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + merge(extProperties, this.allConfigs); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("register lock error. {}" + extProperties); + } + + return this; + } + + /** + * The store path will be gotten from the field of object. + * + * @param object + * @param fieldName + * + * @throws java.lang.RuntimeException if the field of object is not exist. + */ + public void setStorePathFromConfig(Object object, String fieldName) { + assert object != null; + + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + this.storePathFromConfig = true; + this.storePathObject = object; + // check + this.storePathField = object.getClass().getDeclaredField(fieldName); + assert this.storePathField != null + && !Modifier.isStatic(this.storePathField.getModifiers()); + this.storePathField.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("setStorePathFromConfig lock error"); + } + } + + private String getStorePath() { + String realStorePath = null; + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + realStorePath = this.storePath; + + if (this.storePathFromConfig) { + try { + realStorePath = (String) storePathField.get(this.storePathObject); + } catch (IllegalAccessException e) { + log.error("getStorePath error, ", e); + } + } + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getStorePath lock error"); + } + + return realStorePath; + } + + public void update(Properties properties) { + try { + readWriteLock.writeLock().lockInterruptibly(); + + try { + // the property must be exist when update + mergeIfExist(properties, this.allConfigs); + + for (Object configObject : configObjectList) { + // not allConfigs to update... + MixAll.properties2Object(properties, configObject); + } + + this.dataVersion.nextVersion(); + + } finally { + readWriteLock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("update lock error, {}", properties); + return; + } + + persist(); + } + + public void persist() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + String allConfigs = getAllConfigsInternal(); + + MixAll.string2File(allConfigs, getStorePath()); + } catch (IOException e) { + log.error("persist string2File error, ", e); + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("persist lock error"); + } + } + + public String getAllConfigsFormatString() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + + return getAllConfigsInternal(); + + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getAllConfigsFormatString lock error"); + } + + return null; + } + + public String getDataVersionJson() { + return this.dataVersion.toJson(); + } + + public Properties getAllConfigs() { + try { + readWriteLock.readLock().lockInterruptibly(); + + try { + + return this.allConfigs; + + } finally { + readWriteLock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getAllConfigs lock error"); + } + + return null; + } + + private String getAllConfigsInternal() { + StringBuilder stringBuilder = new StringBuilder(); + + // reload from config object ? + for (Object configObject : this.configObjectList) { + Properties properties = MixAll.object2Properties(configObject); + if (properties != null) { + merge(properties, this.allConfigs); + } else { + log.warn("getAllConfigsInternal object2Properties is null, {}", configObject.getClass()); + } + } + + { + stringBuilder.append(MixAll.properties2String(this.allConfigs)); + } + + return stringBuilder.toString(); + } + + public void setStorePath(final String storePath) { + this.storePath = storePath; + } + + private void merge(Properties from, Properties to) { + for (Object key : from.keySet()) { + Object fromObj = from.get(key), toObj = to.get(key); + if (toObj != null && !toObj.equals(fromObj)) { + log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj); + } + to.put(key, fromObj); + } + } + + private void mergeIfExist(Properties from, Properties to) { + for (Object key : from.keySet()) { + if (!to.containsKey(key)) { + continue; + } + + Object fromObj = from.get(key), toObj = to.get(key); + if (toObj != null && !toObj.equals(fromObj)) { + log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj); + } + to.put(key, fromObj); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java new file mode 100644 index 0000000..971c0c7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * Add reset feature for @see java.util.concurrent.CountDownLatch + * + * @author xinyuzhou.zxy + */ +public class CountDownLatch { + /** + * Synchronization control For CountDownLatch. + * Uses AQS state to represent count. + */ + private static final class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 4982264981922014374L; + + private final int startCount; + + Sync(int count) { + this.startCount = count; + setState(count); + } + + int getCount() { + return getState(); + } + + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; + } + + protected boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (;;) { + int c = getState(); + if (c == 0) + return false; + int nextc = c - 1; + if (compareAndSetState(c, nextc)) + return nextc == 0; + } + } + + protected void reset() { + setState(startCount); + } + } + + private final Sync sync; + + /** + * Constructs a {@code CountDownLatch} initialized with the given count. + * + * @param count + * the number of times {@link #countDown} must be invoked + * before threads can pass through {@link #await} + * + * @throws IllegalArgumentException + * if {@code count} is negative + */ + public CountDownLatch(int count) { + if (count < 0) throw new IllegalArgumentException("count < 0"); + this.sync = new Sync(count); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. + * + * <p>If the current count is zero then this method returns immediately. + * + * <p>If the current count is greater than zero then the current + * thread becomes disabled for thread scheduling purposes and lies + * dormant until one of two things happen: + * <ul> + * <li>The count reaches zero due to invocations of the + * {@link #countDown} method; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + * </ul> + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting, + * </ul> + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @throws InterruptedException + * if the current thread is interrupted + * while waiting + */ + public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, + * or the specified waiting time elapses. + * + * <p>If the current count is zero then this method returns immediately + * with the value {@code true}. + * + * <p>If the current count is greater than zero then the current + * thread becomes disabled for thread scheduling purposes and lies + * dormant until one of three things happen: + * <ul> + * <li>The count reaches zero due to invocations of the + * {@link #countDown} method; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * <li>The specified waiting time elapses. + * </ul> + * + * <p>If the count reaches zero then the method returns with the + * value {@code true}. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting, + * </ul> + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the {@code timeout} argument + * + * @return {@code true} if the count reached zero and {@code false} + * if the waiting time elapsed before the count reached zero + * + * @throws InterruptedException + * if the current thread is interrupted + * while waiting + */ + public boolean await(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Decrements the count of the latch, releasing all waiting threads if + * the count reaches zero. + * + * <p>If the current count is greater than zero then it is decremented. + * If the new count is zero then all waiting threads are re-enabled for + * thread scheduling purposes. + * + * <p>If the current count equals zero then nothing happens. + */ + public void countDown() { + sync.releaseShared(1); + } + + /** + * Returns the current count. + * + * <p>This method is typically used for debugging and testing purposes. + * + * @return the current count + */ + public long getCount() { + return sync.getCount(); + } + + public void reset() { + sync.reset(); + } + + /** + * Returns a string identifying this latch, as well as its state. + * The state, in brackets, includes the String {@code "Count ="} + * followed by the current count. + * + * @return a string identifying this latch, as well as its state + */ + public String toString() { + return super.toString() + "[Count = " + sync.getCount() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/DataVersion.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java new file mode 100644 index 0000000..94fd90b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class DataVersion extends RemotingSerializable { + private long timestatmp = System.currentTimeMillis(); + private AtomicLong counter = new AtomicLong(0); + + + public void assignNewOne(final DataVersion dataVersion) { + this.timestatmp = dataVersion.timestatmp; + this.counter.set(dataVersion.counter.get()); + } + + + public void nextVersion() { + this.timestatmp = System.currentTimeMillis(); + this.counter.incrementAndGet(); + } + + + public long getTimestatmp() { + return timestatmp; + } + + + public void setTimestatmp(long timestatmp) { + this.timestatmp = timestatmp; + } + + + public AtomicLong getCounter() { + return counter; + } + + + public void setCounter(AtomicLong counter) { + this.counter = counter; + } + + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final DataVersion that = (DataVersion) o; + + if (timestatmp != that.timestatmp) return false; + return counter != null ? counter.equals(that.counter) : that.counter == null; + + } + + @Override + public int hashCode() { + int result = (int) (timestatmp ^ (timestatmp >>> 32)); + result = 31 * result + (counter != null ? counter.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/MQVersion.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java new file mode 100644 index 0000000..f53fc27 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +/** + * @author shijia.wxr + */ +public class MQVersion { + + public static final int CURRENT_VERSION = Version.V4_0_0_SNAPSHOT.ordinal(); + + + public static String getVersionDesc(int value) { + try { + Version v = Version.values()[value]; + return v.name(); + } catch (Exception e) { + } + + return "HigherVersion"; + } + + + public static Version value2Version(int value) { + return Version.values()[value]; + } + + public enum Version { + V3_0_0_SNAPSHOT, + V3_0_0_ALPHA1, + V3_0_0_BETA1, + V3_0_0_BETA2, + V3_0_0_BETA3, + V3_0_0_BETA4, + V3_0_0_BETA5, + V3_0_0_BETA6_SNAPSHOT, + V3_0_0_BETA6, + V3_0_0_BETA7_SNAPSHOT, + V3_0_0_BETA7, + V3_0_0_BETA8_SNAPSHOT, + V3_0_0_BETA8, + V3_0_0_BETA9_SNAPSHOT, + V3_0_0_BETA9, + V3_0_0_FINAL, + V3_0_1_SNAPSHOT, + V3_0_1, + V3_0_2_SNAPSHOT, + V3_0_2, + V3_0_3_SNAPSHOT, + V3_0_3, + V3_0_4_SNAPSHOT, + V3_0_4, + V3_0_5_SNAPSHOT, + V3_0_5, + V3_0_6_SNAPSHOT, + V3_0_6, + V3_0_7_SNAPSHOT, + V3_0_7, + V3_0_8_SNAPSHOT, + V3_0_8, + V3_0_9_SNAPSHOT, + V3_0_9, + + V3_0_10_SNAPSHOT, + V3_0_10, + + V3_0_11_SNAPSHOT, + V3_0_11, + + V3_0_12_SNAPSHOT, + V3_0_12, + + V3_0_13_SNAPSHOT, + V3_0_13, + + V3_0_14_SNAPSHOT, + V3_0_14, + + V3_0_15_SNAPSHOT, + V3_0_15, + + V3_1_0_SNAPSHOT, + V3_1_0, + + V3_1_1_SNAPSHOT, + V3_1_1, + + V3_1_2_SNAPSHOT, + V3_1_2, + + V3_1_3_SNAPSHOT, + V3_1_3, + + V3_1_4_SNAPSHOT, + V3_1_4, + + V3_1_5_SNAPSHOT, + V3_1_5, + + V3_1_6_SNAPSHOT, + V3_1_6, + + V3_1_7_SNAPSHOT, + V3_1_7, + + V3_1_8_SNAPSHOT, + V3_1_8, + + V3_1_9_SNAPSHOT, + V3_1_9, + + V3_2_0_SNAPSHOT, + V3_2_0, + + V3_2_1_SNAPSHOT, + V3_2_1, + + V3_2_2_SNAPSHOT, + V3_2_2, + + V3_2_3_SNAPSHOT, + V3_2_3, + + V3_2_4_SNAPSHOT, + V3_2_4, + + V3_2_5_SNAPSHOT, + V3_2_5, + + V3_2_6_SNAPSHOT, + V3_2_6, + + V3_2_7_SNAPSHOT, + V3_2_7, + + V3_2_8_SNAPSHOT, + V3_2_8, + + V3_2_9_SNAPSHOT, + V3_2_9, + + V3_3_1_SNAPSHOT, + V3_3_1, + + V3_3_2_SNAPSHOT, + V3_3_2, + + V3_3_3_SNAPSHOT, + V3_3_3, + + V3_3_4_SNAPSHOT, + V3_3_4, + + V3_3_5_SNAPSHOT, + V3_3_5, + + V3_3_6_SNAPSHOT, + V3_3_6, + + V3_3_7_SNAPSHOT, + V3_3_7, + + V3_3_8_SNAPSHOT, + V3_3_8, + + V3_3_9_SNAPSHOT, + V3_3_9, + + V3_4_1_SNAPSHOT, + V3_4_1, + + V3_4_2_SNAPSHOT, + V3_4_2, + + V3_4_3_SNAPSHOT, + V3_4_3, + + V3_4_4_SNAPSHOT, + V3_4_4, + + V3_4_5_SNAPSHOT, + V3_4_5, + + V3_4_6_SNAPSHOT, + V3_4_6, + + V3_4_7_SNAPSHOT, + V3_4_7, + + V3_4_8_SNAPSHOT, + V3_4_8, + + V3_4_9_SNAPSHOT, + V3_4_9, + V3_5_1_SNAPSHOT, + V3_5_1, + + V3_5_2_SNAPSHOT, + V3_5_2, + + V3_5_3_SNAPSHOT, + V3_5_3, + + V3_5_4_SNAPSHOT, + V3_5_4, + + V3_5_5_SNAPSHOT, + V3_5_5, + + V3_5_6_SNAPSHOT, + V3_5_6, + + V3_5_7_SNAPSHOT, + V3_5_7, + + V3_5_8_SNAPSHOT, + V3_5_8, + + V3_5_9_SNAPSHOT, + V3_5_9, + + V3_6_1_SNAPSHOT, + V3_6_1, + + V3_6_2_SNAPSHOT, + V3_6_2, + + V3_6_3_SNAPSHOT, + V3_6_3, + + V3_6_4_SNAPSHOT, + V3_6_4, + + V3_6_5_SNAPSHOT, + V3_6_5, + + V3_6_6_SNAPSHOT, + V3_6_6, + + V3_6_7_SNAPSHOT, + V3_6_7, + + V3_6_8_SNAPSHOT, + V3_6_8, + + V3_6_9_SNAPSHOT, + V3_6_9, + + V3_7_1_SNAPSHOT, + V3_7_1, + + V3_7_2_SNAPSHOT, + V3_7_2, + + V3_7_3_SNAPSHOT, + V3_7_3, + + V3_7_4_SNAPSHOT, + V3_7_4, + + V3_7_5_SNAPSHOT, + V3_7_5, + + V3_7_6_SNAPSHOT, + V3_7_6, + + V3_7_7_SNAPSHOT, + V3_7_7, + + V3_7_8_SNAPSHOT, + V3_7_8, + + V3_7_9_SNAPSHOT, + V3_7_9, + + V3_8_1_SNAPSHOT, + V3_8_1, + + V3_8_2_SNAPSHOT, + V3_8_2, + + V3_8_3_SNAPSHOT, + V3_8_3, + + V3_8_4_SNAPSHOT, + V3_8_4, + + V3_8_5_SNAPSHOT, + V3_8_5, + + V3_8_6_SNAPSHOT, + V3_8_6, + + V3_8_7_SNAPSHOT, + V3_8_7, + + V3_8_8_SNAPSHOT, + V3_8_8, + + V3_8_9_SNAPSHOT, + V3_8_9, + + V3_9_1_SNAPSHOT, + V3_9_1, + + V3_9_2_SNAPSHOT, + V3_9_2, + + V3_9_3_SNAPSHOT, + V3_9_3, + + V3_9_4_SNAPSHOT, + V3_9_4, + + V3_9_5_SNAPSHOT, + V3_9_5, + + V3_9_6_SNAPSHOT, + V3_9_6, + + V3_9_7_SNAPSHOT, + V3_9_7, + + V3_9_8_SNAPSHOT, + V3_9_8, + + V3_9_9_SNAPSHOT, + V3_9_9, + + V4_0_0_SNAPSHOT, + V4_0_0, + + V4_1_0_SNAPSHOT, + V4_1_0, + + V4_2_0_SNAPSHOT, + V4_2_0, + + V4_3_0_SNAPSHOT, + V4_3_0, + + V4_4_0_SNAPSHOT, + V4_4_0, + + V4_5_0_SNAPSHOT, + V4_5_0, + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/MixAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java new file mode 100644 index 0000000..12fb65a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.help.FAQUrl; +import org.slf4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class MixAll { + public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; + public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"; + public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; + public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr"; + public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel"; + public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net"); + public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); + // http://jmenv.tbsite.net:8080/rocketmq/nsaddr + public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; + public static final String DEFAULT_TOPIC = "TBW102"; + public static final String BENCHMARK_TOPIC = "BenchmarkTest"; + public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; + public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; + public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; + public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER"; + public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER"; + public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; + public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; + public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; + public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; + public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; + public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"; + public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"; + public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; + public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL"; + public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_"; + + public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress(); + public static final String LOCALHOST = localhost(); + public static final String DEFAULT_CHARSET = "UTF-8"; + public static final long MASTER_ID = 0L; + public static final long CURRENT_JVM_PID = getPID(); + + public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; + + public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; + public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; + public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; + public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; + public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; + + public static String getRetryTopic(final String consumerGroup) { + return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; + } + + + public static boolean isSysConsumerGroup(final String consumerGroup) { + return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); + } + + public static boolean isSystemTopic(final String topic) { + return topic.startsWith(SYSTEM_TOPIC_PREFIX); + } + + public static String getDLQTopic(final String consumerGroup) { + return DLQ_GROUP_TOPIC_PREFIX + consumerGroup; + } + + + public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) { + if (isChange) { + String[] ipAndPort = brokerAddr.split(":"); + String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2); + return brokerAddrNew; + } else { + return brokerAddr; + } + } + + + public static long getPID() { + String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); + if (processName != null && processName.length() > 0) { + try { + return Long.parseLong(processName.split("@")[0]); + } catch (Exception e) { + return 0; + } + } + + return 0; + } + + + public static long createBrokerId(final String ip, final int port) { + InetSocketAddress isa = new InetSocketAddress(ip, port); + byte[] ipArray = isa.getAddress().getAddress(); + ByteBuffer bb = ByteBuffer.allocate(8); + bb.put(ipArray); + bb.putInt(port); + long value = bb.getLong(0); + return Math.abs(value); + } + + public static final void string2File(final String str, final String fileName) throws IOException { + + String tmpFile = fileName + ".tmp"; + string2FileNotSafe(str, tmpFile); + + + String bakFile = fileName + ".bak"; + String prevContent = file2String(fileName); + if (prevContent != null) { + string2FileNotSafe(prevContent, bakFile); + } + + + File file = new File(fileName); + file.delete(); + + + file = new File(tmpFile); + file.renameTo(new File(fileName)); + } + + + public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { + File file = new File(fileName); + File fileParent = file.getParentFile(); + if (fileParent != null) { + fileParent.mkdirs(); + } + FileWriter fileWriter = null; + + try { + fileWriter = new FileWriter(file); + fileWriter.write(str); + } catch (IOException e) { + throw e; + } finally { + if (fileWriter != null) { + try { + fileWriter.close(); + } catch (IOException e) { + throw e; + } + } + } + } + + + public static final String file2String(final String fileName) { + File file = new File(fileName); + return file2String(file); + } + + public static final String file2String(final File file) { + if (file.exists()) { + char[] data = new char[(int) file.length()]; + boolean result = false; + + FileReader fileReader = null; + try { + fileReader = new FileReader(file); + int len = fileReader.read(data); + result = len == data.length; + } catch (IOException e) { + // e.printStackTrace(); + } finally { + if (fileReader != null) { + try { + fileReader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + if (result) { + return new String(data); + } + } + return null; + } + + public static final String file2String(final URL url) { + InputStream in = null; + try { + URLConnection urlConnection = url.openConnection(); + urlConnection.setUseCaches(false); + in = urlConnection.getInputStream(); + int len = in.available(); + byte[] data = new byte[len]; + in.read(data, 0, len); + return new String(data, "UTF-8"); + } catch (Exception e) { + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + } + } + } + + return null; + } + + public static String findClassPath(Class<?> c) { + URL url = c.getProtectionDomain().getCodeSource().getLocation(); + return url.getPath(); + } + + + public static void printObjectProperties(final Logger log, final Object object) { + printObjectProperties(log, object, false); + } + + + public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) { + Field[] fields = object.getClass().getDeclaredFields(); + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String name = field.getName(); + if (!name.startsWith("this")) { + Object value = null; + try { + field.setAccessible(true); + value = field.get(object); + if (null == value) { + value = ""; + } + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + if (onlyImportantField) { + Annotation annotation = field.getAnnotation(ImportantField.class); + if (null == annotation) { + continue; + } + } + + if (log != null) { + log.info(name + "=" + value); + } else { + } + } + } + } + } + + + public static String properties2String(final Properties properties) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + if (entry.getValue() != null) { + sb.append(entry.getKey().toString() + "=" + entry.getValue().toString() + "\n"); + } + } + return sb.toString(); + } + + public static Properties string2Properties(final String str) { + Properties properties = new Properties(); + try { + InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET)); + properties.load(in); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return null; + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + return properties; + } + + public static Properties object2Properties(final Object object) { + Properties properties = new Properties(); + + Field[] fields = object.getClass().getDeclaredFields(); + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String name = field.getName(); + if (!name.startsWith("this")) { + Object value = null; + try { + field.setAccessible(true); + value = field.get(object); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + if (value != null) { + properties.setProperty(name, value.toString()); + } + } + } + } + + return properties; + } + + public static void properties2Object(final Properties p, final Object object) { + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getProperty(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg = null; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, new Object[]{arg}); + } + } + } catch (Throwable e) { + } + } + } + } + + + public static boolean isPropertiesEqual(final Properties p1, final Properties p2) { + return p1.equals(p2); + } + + + public static List<String> getLocalInetAddress() { + List<String> inetAddressList = new ArrayList<String>(); + try { + Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces(); + while (enumeration.hasMoreElements()) { + NetworkInterface networkInterface = enumeration.nextElement(); + Enumeration<InetAddress> addrs = networkInterface.getInetAddresses(); + while (addrs.hasMoreElements()) { + inetAddressList.add(addrs.nextElement().getHostAddress()); + } + } + } catch (SocketException e) { + throw new RuntimeException("get local inet address fail", e); + } + + return inetAddressList; + } + + + public static boolean isLocalAddr(String address) { + for (String addr : LOCAL_INET_ADDRESS) { + if (address.contains(addr)) + return true; + } + return false; + } + + + private static String localhost() { + try { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostAddress(); + } catch (Throwable e) { + throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); + } + } + + + public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { + long prev = target.get(); + while (value > prev) { + boolean updated = target.compareAndSet(prev, value); + if (updated) + return true; + + prev = target.get(); + } + + return false; + } + + public static String localhostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (Throwable e) { + throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); + } + } + + public Set<String> list2Set(List<String> values) { + Set<String> result = new HashSet<String>(); + for (String v : values) { + result.add(v); + } + return result; + } + + public List<String> set2List(Set<String> values) { + List<String> result = new ArrayList<String>(); + for (String v : values) { + result.add(v); + } + return result; + } + + public static String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/Pair.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/Pair.java b/common/src/main/java/org/apache/rocketmq/common/Pair.java new file mode 100644 index 0000000..ed6c246 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/Pair.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +/** + * @author shijia.wxr + */ +public class Pair<T1, T2> { + private T1 object1; + private T2 object2; + + + public Pair(T1 object1, T2 object2) { + this.object1 = object1; + this.object2 = object2; + } + + + public T1 getObject1() { + return object1; + } + + + public void setObject1(T1 object1) { + this.object1 = object1; + } + + + public T2 getObject2() { + return object2; + } + + + public void setObject2(T2 object2) { + this.object2 = object2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ServiceState.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceState.java b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java new file mode 100644 index 0000000..97f5b90 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +/** + * @author shijia.wxr + */ +public enum ServiceState { + /** + * Service just created,not start + */ + CREATE_JUST, + /** + * Service Running + */ + RUNNING, + /** + * Service shutdown + */ + SHUTDOWN_ALREADY, + /** + * Service Start failure + */ + START_FAILED; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java new file mode 100644 index 0000000..4fd5154 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author shijia.wxr + * @author xinyuzhou.zxy + */ +public abstract class ServiceThread implements Runnable { + private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private static final long JOIN_TIME = 90 * 1000; + + protected final Thread thread; + + protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); + + protected volatile boolean stopped = false; + + protected final CountDownLatch waitPoint = new CountDownLatch(1); + + + public ServiceThread() { + this.thread = new Thread(this, this.getServiceName()); + } + + + public abstract String getServiceName(); + + + public void start() { + this.thread.start(); + } + + + public void shutdown() { + this.shutdown(false); + } + + public void shutdown(final boolean interrupt) { + this.stopped = true; + STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + + try { + if (interrupt) { + this.thread.interrupt(); + } + + long beginTime = System.currentTimeMillis(); + if (!this.thread.isDaemon()) { + this.thread.join(this.getJointime()); + } + long eclipseTime = System.currentTimeMillis() - beginTime; + STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + + this.getJointime()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public long getJointime() { + return JOIN_TIME; + } + + public void stop() { + this.stop(false); + } + + public void stop(final boolean interrupt) { + this.stopped = true; + STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + + if (interrupt) { + this.thread.interrupt(); + } + } + + public void makeStop() { + this.stopped = true; + STLOG.info("makestop thread " + this.getServiceName()); + } + + public void wakeup() { + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + } + + protected void waitForRunning(long interval) { + if (hasNotified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } + + //entry to wait + waitPoint.reset(); + + try { + waitPoint.await(interval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + hasNotified.set(false); + this.onWaitEnd(); + } + } + + protected void onWaitEnd() { + } + + public boolean isStopped() { + return stopped; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/SystemClock.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java new file mode 100644 index 0000000..f86a4f5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +/** + * @author vintage.wang + */ +public class SystemClock { + public long now() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java new file mode 100644 index 0000000..43ab2f2 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + + +public class ThreadFactoryImpl implements ThreadFactory { + private final AtomicLong threadIndex = new AtomicLong(0); + private final String threadNamePrefix; + + + public ThreadFactoryImpl(final String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + } + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java new file mode 100644 index 0000000..1aef5e7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.constant.PermName; + + +/** + * @author shijia.wxr + */ +public class TopicConfig { + private static final String SEPARATOR = " "; + public static int defaultReadQueueNums = 16; + public static int defaultWriteQueueNums = 16; + private String topicName; + private int readQueueNums = defaultReadQueueNums; + private int writeQueueNums = defaultWriteQueueNums; + private int perm = PermName.PERM_READ | PermName.PERM_WRITE; + private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; + private int topicSysFlag = 0; + private boolean order = false; + + + public TopicConfig() { + } + + + public TopicConfig(String topicName) { + this.topicName = topicName; + } + + + public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) { + this.topicName = topicName; + this.readQueueNums = readQueueNums; + this.writeQueueNums = writeQueueNums; + this.perm = perm; + } + + + public String encode() { + StringBuilder sb = new StringBuilder(); + + // 1 + sb.append(this.topicName); + sb.append(SEPARATOR); + + // 2 + sb.append(this.readQueueNums); + sb.append(SEPARATOR); + + // 3 + sb.append(this.writeQueueNums); + sb.append(SEPARATOR); + + // 4 + sb.append(this.perm); + sb.append(SEPARATOR); + + // 5 + sb.append(this.topicFilterType); + + return sb.toString(); + } + + + public boolean decode(final String in) { + String[] strs = in.split(SEPARATOR); + if (strs != null && strs.length == 5) { + this.topicName = strs[0]; + + this.readQueueNums = Integer.parseInt(strs[1]); + + this.writeQueueNums = Integer.parseInt(strs[2]); + + this.perm = Integer.parseInt(strs[3]); + + this.topicFilterType = TopicFilterType.valueOf(strs[4]); + + return true; + } + + return false; + } + + + public String getTopicName() { + return topicName; + } + + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + + public int getReadQueueNums() { + return readQueueNums; + } + + + public void setReadQueueNums(int readQueueNums) { + this.readQueueNums = readQueueNums; + } + + + public int getWriteQueueNums() { + return writeQueueNums; + } + + + public void setWriteQueueNums(int writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + + public int getPerm() { + return perm; + } + + + public void setPerm(int perm) { + this.perm = perm; + } + + + public TopicFilterType getTopicFilterType() { + return topicFilterType; + } + + + public void setTopicFilterType(TopicFilterType topicFilterType) { + this.topicFilterType = topicFilterType; + } + + + public int getTopicSysFlag() { + return topicSysFlag; + } + + + public void setTopicSysFlag(int topicSysFlag) { + this.topicSysFlag = topicSysFlag; + } + + + public boolean isOrder() { + return order; + } + + + public void setOrder(boolean isOrder) { + this.order = isOrder; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final TopicConfig that = (TopicConfig) o; + + if (readQueueNums != that.readQueueNums) return false; + if (writeQueueNums != that.writeQueueNums) return false; + if (perm != that.perm) return false; + if (topicSysFlag != that.topicSysFlag) return false; + if (order != that.order) return false; + if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false; + return topicFilterType == that.topicFilterType; + + } + + @Override + public int hashCode() { + int result = topicName != null ? topicName.hashCode() : 0; + result = 31 * result + readQueueNums; + result = 31 * result + writeQueueNums; + result = 31 * result + perm; + result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0); + result = 31 * result + topicSysFlag; + result = 31 * result + (order ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums + + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm) + + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + + order + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java new file mode 100644 index 0000000..771fcaf --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +/** + * @author shijia.wxr + */ +public enum TopicFilterType { + SINGLE_TAG, + MULTI_TAG +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/UtilAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java new file mode 100644 index 0000000..2f9b72e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.text.NumberFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.zip.CRC32; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; + + +/** + * @author shijia.wxr + */ +public class UtilAll { + public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; + public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS"; + public static final String YYYY_MMDD_HHMMSS = "yyyyMMddHHmmss"; + + + public static int getPid() { + RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); + String name = runtime.getName(); // format: "pid@hostname" + try { + return Integer.parseInt(name.substring(0, name.indexOf('@'))); + } catch (Exception e) { + return -1; + } + } + + public static String currentStackTrace() { + StringBuilder sb = new StringBuilder(); + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + for (StackTraceElement ste : stackTrace) { + sb.append("\n\t"); + sb.append(ste.toString()); + } + + return sb.toString(); + } + + public static String offset2FileName(final long offset) { + final NumberFormat nf = NumberFormat.getInstance(); + nf.setMinimumIntegerDigits(20); + nf.setMaximumFractionDigits(0); + nf.setGroupingUsed(false); + return nf.format(offset); + } + + public static long computeEclipseTimeMilliseconds(final long beginTime) { + return System.currentTimeMillis() - beginTime; + } + + + public static boolean isItTimeToDo(final String when) { + String[] whiles = when.split(";"); + if (whiles != null && whiles.length > 0) { + Calendar now = Calendar.getInstance(); + for (String w : whiles) { + int nowHour = Integer.parseInt(w); + if (nowHour == now.get(Calendar.HOUR_OF_DAY)) { + return true; + } + } + } + + return false; + } + + + public static String timeMillisToHumanString() { + return timeMillisToHumanString(System.currentTimeMillis()); + } + + + public static String timeMillisToHumanString(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND), + cal.get(Calendar.MILLISECOND)); + } + + + public static long computNextMorningTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextMinutesTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 0); + cal.add(Calendar.MINUTE, 1); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextHourTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 1); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextHalfHourTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 1); + cal.set(Calendar.MINUTE, 30); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static String timeMillisToHumanString2(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d-%02d-%02d %02d:%02d:%02d,%03d", + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND), + cal.get(Calendar.MILLISECOND)); + } + + + public static String timeMillisToHumanString3(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d%02d%02d%02d%02d%02d", + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND)); + } + + + public static double getDiskPartitionSpaceUsedPercent(final String path) { + if (null == path || path.isEmpty()) + return -1; + + try { + File file = new File(path); + if (!file.exists()) { + boolean result = file.mkdirs(); + if (!result) { + } + } + + long totalSpace = file.getTotalSpace(); + long freeSpace = file.getFreeSpace(); + long usedSpace = totalSpace - freeSpace; + if (totalSpace > 0) { + return usedSpace / (double) totalSpace; + } + } catch (Exception e) { + return -1; + } + + return -1; + } + + + public static final int crc32(byte[] array) { + if (array != null) { + return crc32(array, 0, array.length); + } + + return 0; + } + + + public static final int crc32(byte[] array, int offset, int length) { + CRC32 crc32 = new CRC32(); + crc32.update(array, offset, length); + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + + final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + + public static String bytes2string(byte[] src) { + char[] hexChars = new char[src.length * 2]; + for (int j = 0; j < src.length; j++) { + int v = src[j] & 0xFF; + hexChars[j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } + + public static byte[] string2bytes(String hexString) { + if (hexString == null || hexString.equals("")) { + return null; + } + hexString = hexString.toUpperCase(); + int length = hexString.length() / 2; + char[] hexChars = hexString.toCharArray(); + byte[] d = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return d; + } + + + private static byte charToByte(char c) { + return (byte) "0123456789ABCDEF".indexOf(c); + } + + + public static byte[] uncompress(final byte[] src) throws IOException { + byte[] result = src; + byte[] uncompressData = new byte[src.length]; + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); + InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + + try { + while (true) { + int len = inflaterInputStream.read(uncompressData, 0, uncompressData.length); + if (len <= 0) { + break; + } + byteArrayOutputStream.write(uncompressData, 0, len); + } + byteArrayOutputStream.flush(); + result = byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw e; + } finally { + try { + byteArrayInputStream.close(); + } catch (IOException e) { + } + try { + inflaterInputStream.close(); + } catch (IOException e) { + } + try { + byteArrayOutputStream.close(); + } catch (IOException e) { + } + } + + return result; + } + + + public static byte[] compress(final byte[] src, final int level) throws IOException { + byte[] result = src; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + java.util.zip.Deflater defeater = new java.util.zip.Deflater(level); + DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater); + try { + deflaterOutputStream.write(src); + deflaterOutputStream.finish(); + deflaterOutputStream.close(); + result = byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + defeater.end(); + throw e; + } finally { + try { + byteArrayOutputStream.close(); + } catch (IOException ignored) { + } + + defeater.end(); + } + + return result; + } + + + public static int asInt(String str, int defaultValue) { + try { + return Integer.parseInt(str); + } catch (Exception e) { + return defaultValue; + } + } + + + public static long asLong(String str, long defaultValue) { + try { + return Long.parseLong(str); + } catch (Exception e) { + return defaultValue; + } + } + + + public static String formatDate(Date date, String pattern) { + SimpleDateFormat df = new SimpleDateFormat(pattern); + return df.format(date); + } + + + public static Date parseDate(String date, String pattern) { + SimpleDateFormat df = new SimpleDateFormat(pattern); + try { + return df.parse(date); + } catch (ParseException e) { + return null; + } + } + + + public static String responseCode2String(final int code) { + return Integer.toString(code); + } + + + public static String frontStringAtLeast(final String str, final int size) { + if (str != null) { + if (str.length() > size) { + return str.substring(0, size); + } + } + + return str; + } + + + public static boolean isBlank(String str) { + int strLen; + if (str == null || (strLen = str.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } + + + public static String jstack() { + return jstack(Thread.getAllStackTraces()); + } + + + public static String jstack(Map<Thread, StackTraceElement[]> map) { + StringBuilder result = new StringBuilder(); + try { + Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator(); + while (ite.hasNext()) { + Map.Entry<Thread, StackTraceElement[]> entry = ite.next(); + StackTraceElement[] elements = entry.getValue(); + Thread thread = entry.getKey(); + if (elements != null && elements.length > 0) { + String threadName = entry.getKey().getName(); + result.append(String.format("%-40sTID: %d STATE: %s%n", threadName, thread.getId(), thread.getState())); + for (StackTraceElement el : elements) { + result.append(String.format("%-40s%s%n", threadName, el.toString())); + } + result.append("\n"); + } + } + } catch (Throwable e) { + result.append(RemotingHelper.exceptionSimpleDesc(e)); + } + + return result.toString(); + } + + public static boolean isInternalIP(byte[] ip) { + if (ip.length != 4) { + throw new RuntimeException("illegal ipv4 bytes"); + } + + + //10.0.0.0~10.255.255.255 + //172.16.0.0~172.31.255.255 + //192.168.0.0~192.168.255.255 + if (ip[0] == (byte) 10) { + + return true; + } else if (ip[0] == (byte) 172) { + if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) { + return true; + } + } else if (ip[0] == (byte) 192) { + if (ip[1] == (byte) 168) { + return true; + } + } + return false; + } + + private static boolean ipCheck(byte[] ip) { + if (ip.length != 4) { + throw new RuntimeException("illegal ipv4 bytes"); + } + +// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) { +// } + + + if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) { + if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) { + return false; + } + if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) { + return false; + } + return true; + } else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) { + if (ip[2] == (byte) 1 && ip[3] == (byte) 1) { + return false; + } + if (ip[2] == (byte) 0 && ip[3] == (byte) 0) { + return false; + } + return true; + } else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) { + if (ip[3] == (byte) 1) { + return false; + } + if (ip[3] == (byte) 0) { + return false; + } + return true; + } + return false; + } + + public static String ipToIPv4Str(byte[] ip) { + if (ip.length != 4) { + return null; + } + return new StringBuilder().append(ip[0] & 0xFF).append(".").append( + ip[1] & 0xFF).append(".").append(ip[2] & 0xFF) + .append(".").append(ip[3] & 0xFF).toString(); + } + + public static byte[] getIP() { + try { + Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces(); + InetAddress ip = null; + byte[] internalIP = null; + while (allNetInterfaces.hasMoreElements()) { + NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement(); + Enumeration addresses = netInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + ip = (InetAddress) addresses.nextElement(); + if (ip != null && ip instanceof Inet4Address) { + byte[] ipByte = ip.getAddress(); + if (ipByte.length == 4) { + if (ipCheck(ipByte)) { + if (!isInternalIP(ipByte)) { + return ipByte; + } else if (internalIP == null) { + internalIP = ipByte; + } + } + } + } + } + } + if (internalIP != null) { + return internalIP; + } else { + throw new RuntimeException("Can not get local ip"); + } + } catch (Exception e) { + throw new RuntimeException("Can not get local ip", e); + } + } +}