http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java new file mode 100644 index 0000000..4429e3d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.text.NumberFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.zip.CRC32; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; + + +/** + * @author shijia.wxr + */ +public class UtilAll { + public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; + public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS"; + public static final String YYYY_MMDD_HHMMSS = "yyyyMMddHHmmss"; + + + public static int getPid() { + RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); + String name = runtime.getName(); // format: "pid@hostname" + try { + return Integer.parseInt(name.substring(0, name.indexOf('@'))); + } catch (Exception e) { + return -1; + } + } + + public static String currentStackTrace() { + StringBuilder sb = new StringBuilder(); + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + for (StackTraceElement ste : stackTrace) { + sb.append("\n\t"); + sb.append(ste.toString()); + } + + return sb.toString(); + } + + public static String offset2FileName(final long offset) { + final NumberFormat nf = NumberFormat.getInstance(); + nf.setMinimumIntegerDigits(20); + nf.setMaximumFractionDigits(0); + nf.setGroupingUsed(false); + return nf.format(offset); + } + + public static long computeEclipseTimeMilliseconds(final long beginTime) { + return System.currentTimeMillis() - beginTime; + } + + + public static boolean isItTimeToDo(final String when) { + String[] whiles = when.split(";"); + if (whiles != null && whiles.length > 0) { + Calendar now = Calendar.getInstance(); + for (String w : whiles) { + int nowHour = Integer.parseInt(w); + if (nowHour == now.get(Calendar.HOUR_OF_DAY)) { + return true; + } + } + } + + return false; + } + + + public static String timeMillisToHumanString() { + return timeMillisToHumanString(System.currentTimeMillis()); + } + + + public static String timeMillisToHumanString(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND), + cal.get(Calendar.MILLISECOND)); + } + + + public static long computNextMorningTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextMinutesTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 0); + cal.add(Calendar.MINUTE, 1); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextHourTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 1); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static long computNextHalfHourTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 1); + cal.set(Calendar.MINUTE, 30); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + + public static String timeMillisToHumanString2(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d-%02d-%02d %02d:%02d:%02d,%03d", + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND), + cal.get(Calendar.MILLISECOND)); + } + + + public static String timeMillisToHumanString3(final long t) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(t); + return String.format("%04d%02d%02d%02d%02d%02d", + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND)); + } + + + public static double getDiskPartitionSpaceUsedPercent(final String path) { + if (null == path || path.isEmpty()) + return -1; + + try { + File file = new File(path); + if (!file.exists()) { + boolean result = file.mkdirs(); + if (!result) { + } + } + + long totalSpace = file.getTotalSpace(); + long freeSpace = file.getFreeSpace(); + long usedSpace = totalSpace - freeSpace; + if (totalSpace > 0) { + return usedSpace / (double) totalSpace; + } + } catch (Exception e) { + return -1; + } + + return -1; + } + + + public static final int crc32(byte[] array) { + if (array != null) { + return crc32(array, 0, array.length); + } + + return 0; + } + + + public static final int crc32(byte[] array, int offset, int length) { + CRC32 crc32 = new CRC32(); + crc32.update(array, offset, length); + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + + final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + + public static String bytes2string(byte[] src) { + char[] hexChars = new char[src.length * 2]; + for (int j = 0; j < src.length; j++) { + int v = src[j] & 0xFF; + hexChars[j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } + + public static byte[] string2bytes(String hexString) { + if (hexString == null || hexString.equals("")) { + return null; + } + hexString = hexString.toUpperCase(); + int length = hexString.length() / 2; + char[] hexChars = hexString.toCharArray(); + byte[] d = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return d; + } + + + private static byte charToByte(char c) { + return (byte) "0123456789ABCDEF".indexOf(c); + } + + + public static byte[] uncompress(final byte[] src) throws IOException { + byte[] result = src; + byte[] uncompressData = new byte[src.length]; + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); + InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + + try { + while (true) { + int len = inflaterInputStream.read(uncompressData, 0, uncompressData.length); + if (len <= 0) { + break; + } + byteArrayOutputStream.write(uncompressData, 0, len); + } + byteArrayOutputStream.flush(); + result = byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw e; + } finally { + try { + byteArrayInputStream.close(); + } catch (IOException e) { + } + try { + inflaterInputStream.close(); + } catch (IOException e) { + } + try { + byteArrayOutputStream.close(); + } catch (IOException e) { + } + } + + return result; + } + + + public static byte[] compress(final byte[] src, final int level) throws IOException { + byte[] result = src; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + java.util.zip.Deflater defeater = new java.util.zip.Deflater(level); + DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater); + try { + deflaterOutputStream.write(src); + deflaterOutputStream.finish(); + deflaterOutputStream.close(); + result = byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + defeater.end(); + throw e; + } finally { + try { + byteArrayOutputStream.close(); + } catch (IOException ignored) { + } + + defeater.end(); + } + + return result; + } + + + public static int asInt(String str, int defaultValue) { + try { + return Integer.parseInt(str); + } catch (Exception e) { + return defaultValue; + } + } + + + public static long asLong(String str, long defaultValue) { + try { + return Long.parseLong(str); + } catch (Exception e) { + return defaultValue; + } + } + + + public static String formatDate(Date date, String pattern) { + SimpleDateFormat df = new SimpleDateFormat(pattern); + return df.format(date); + } + + + public static Date parseDate(String date, String pattern) { + SimpleDateFormat df = new SimpleDateFormat(pattern); + try { + return df.parse(date); + } catch (ParseException e) { + return null; + } + } + + + public static String responseCode2String(final int code) { + return Integer.toString(code); + } + + + public static String frontStringAtLeast(final String str, final int size) { + if (str != null) { + if (str.length() > size) { + return str.substring(0, size); + } + } + + return str; + } + + + public static boolean isBlank(String str) { + int strLen; + if (str == null || (strLen = str.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } + + + public static String jstack() { + return jstack(Thread.getAllStackTraces()); + } + + + public static String jstack(Map<Thread, StackTraceElement[]> map) { + StringBuilder result = new StringBuilder(); + try { + Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator(); + while (ite.hasNext()) { + Map.Entry<Thread, StackTraceElement[]> entry = ite.next(); + StackTraceElement[] elements = entry.getValue(); + Thread thread = entry.getKey(); + if (elements != null && elements.length > 0) { + String threadName = entry.getKey().getName(); + result.append(String.format("%-40sTID: %d STATE: %s%n", threadName, thread.getId(), thread.getState())); + for (StackTraceElement el : elements) { + result.append(String.format("%-40s%s%n", threadName, el.toString())); + } + result.append("\n"); + } + } + } catch (Throwable e) { + result.append(RemotingHelper.exceptionSimpleDesc(e)); + } + + return result.toString(); + } + + public static boolean isInternalIP(byte[] ip) { + if (ip.length != 4) { + throw new RuntimeException("illegal ipv4 bytes"); + } + + + //10.0.0.0~10.255.255.255 + //172.16.0.0~172.31.255.255 + //192.168.0.0~192.168.255.255 + if (ip[0] == (byte) 10) { + + return true; + } else if (ip[0] == (byte) 172) { + if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) { + return true; + } + } else if (ip[0] == (byte) 192) { + if (ip[1] == (byte) 168) { + return true; + } + } + return false; + } + + private static boolean ipCheck(byte[] ip) { + if (ip.length != 4) { + throw new RuntimeException("illegal ipv4 bytes"); + } + +// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) { +// } + + + if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) { + if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) { + return false; + } + if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) { + return false; + } + return true; + } else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) { + if (ip[2] == (byte) 1 && ip[3] == (byte) 1) { + return false; + } + if (ip[2] == (byte) 0 && ip[3] == (byte) 0) { + return false; + } + return true; + } else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) { + if (ip[3] == (byte) 1) { + return false; + } + if (ip[3] == (byte) 0) { + return false; + } + return true; + } + return false; + } + + public static String ipToIPv4Str(byte[] ip) { + if (ip.length != 4) { + return null; + } + return new StringBuilder().append(ip[0] & 0xFF).append(".").append( + ip[1] & 0xFF).append(".").append(ip[2] & 0xFF) + .append(".").append(ip[3] & 0xFF).toString(); + } + + public static byte[] getIP() { + try { + Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces(); + InetAddress ip = null; + byte[] internalIP = null; + while (allNetInterfaces.hasMoreElements()) { + NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement(); + Enumeration addresses = netInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + ip = (InetAddress) addresses.nextElement(); + if (ip != null && ip instanceof Inet4Address) { + byte[] ipByte = ip.getAddress(); + if (ipByte.length == 4) { + if (ipCheck(ipByte)) { + if (!isInternalIP(ipByte)) { + return ipByte; + } else if (internalIP == null) { + internalIP = ipByte; + } + } + } + } + } + } + if (internalIP != null) { + return internalIP; + } else { + throw new RuntimeException("Can not get local ip"); + } + } catch (Exception e) { + throw new RuntimeException("Can not get local ip", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java new file mode 100644 index 0000000..d8c9311 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.admin; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + + +/** + * + * @author shijia.wxr + * + */ +public class ConsumeStats extends RemotingSerializable { + private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>(); + private double consumeTps = 0; + + + public long computeTotalDiff() { + long diffTotal = 0L; + + Iterator<Entry<MessageQueue, OffsetWrapper>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, OffsetWrapper> next = it.next(); + long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset(); + diffTotal += diff; + } + + return diffTotal; + } + + + public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> offsetTable) { + this.offsetTable = offsetTable; + } + + public double getConsumeTps() { + return consumeTps; + } + + public void setConsumeTps(double consumeTps) { + this.consumeTps = consumeTps; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java new file mode 100644 index 0000000..07785c2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.admin; + +/** + * + * @author shijia.wxr + * + */ +public class OffsetWrapper { + private long brokerOffset; + private long consumerOffset; + + private long lastTimestamp; + + + public long getBrokerOffset() { + return brokerOffset; + } + + + public void setBrokerOffset(long brokerOffset) { + this.brokerOffset = brokerOffset; + } + + + public long getConsumerOffset() { + return consumerOffset; + } + + + public void setConsumerOffset(long consumerOffset) { + this.consumerOffset = consumerOffset; + } + + + public long getLastTimestamp() { + return lastTimestamp; + } + + + public void setLastTimestamp(long lastTimestamp) { + this.lastTimestamp = lastTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java new file mode 100644 index 0000000..03d94a2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.admin; + +/** + * + * @author manhong.yqd + */ +public class RollbackStats { + private String brokerName; + private long queueId; + private long brokerOffset; + private long consumerOffset; + private long timestampOffset; + private long rollbackOffset; + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public long getQueueId() { + return queueId; + } + + + public void setQueueId(long queueId) { + this.queueId = queueId; + } + + + public long getBrokerOffset() { + return brokerOffset; + } + + + public void setBrokerOffset(long brokerOffset) { + this.brokerOffset = brokerOffset; + } + + + public long getConsumerOffset() { + return consumerOffset; + } + + + public void setConsumerOffset(long consumerOffset) { + this.consumerOffset = consumerOffset; + } + + + public long getTimestampOffset() { + return timestampOffset; + } + + + public void setTimestampOffset(long timestampOffset) { + this.timestampOffset = timestampOffset; + } + + + public long getRollbackOffset() { + return rollbackOffset; + } + + + public void setRollbackOffset(long rollbackOffset) { + this.rollbackOffset = rollbackOffset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java new file mode 100644 index 0000000..076d6eb --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.admin; + +/** + * + * @author shijia.wxr + * + */ +public class TopicOffset { + private long minOffset; + private long maxOffset; + private long lastUpdateTimestamp; + + + public long getMinOffset() { + return minOffset; + } + + + public void setMinOffset(long minOffset) { + this.minOffset = minOffset; + } + + + public long getMaxOffset() { + return maxOffset; + } + + + public void setMaxOffset(long maxOffset) { + this.maxOffset = maxOffset; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java new file mode 100644 index 0000000..12d1d4b --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.admin; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; + + +/** + * + * @author shijia.wxr + * + */ +public class TopicStatsTable extends RemotingSerializable { + private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>(); + + + public HashMap<MessageQueue, TopicOffset> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java new file mode 100644 index 0000000..fe0cb12 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE}) +public @interface ImportantField { +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java new file mode 100644 index 0000000..54bc04d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.constant; + +public class DBMsgConstants { + public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java new file mode 100644 index 0000000..9175669 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.constant; + +/** + * @author shijia.wxr + */ +public class LoggerName { + public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv"; + public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv"; + public static final String BROKER_LOGGER_NAME = "RocketmqBroker"; + public static final String CLIENT_LOGGER_NAME = "RocketmqClient"; + public static final String TOOLS_LOGGER_NAME = "RocketmqTools"; + public static final String COMMON_LOGGER_NAME = "RocketmqCommon"; + public static final String STORE_LOGGER_NAME = "RocketmqStore"; + public static final String STORE_ERROR_LOGGER_NAME = "RocketmqStoreError"; + public static final String TRANSACTION_LOGGER_NAME = "RocketmqTransaction"; + public static final String REBALANCE_LOCK_LOGGER_NAME = "RocketmqRebalanceLock"; + public static final String ROCKETMQ_STATS_LOGGER_NAME = "RocketmqStats"; + public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial"; + public static final String FLOW_CONTROL_LOGGER_NAME = "RocketmqFlowControl"; + public static final String ROCKETMQ_AUTHORIZE_LOGGER_NAME = "RocketmqAuthorize"; + public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication"; + public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection"; + public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java new file mode 100644 index 0000000..95c2510 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.constant; + +/** + * @author shijia.wxr + */ +public class PermName { + public static final int PERM_PRIORITY = 0x1 << 3; + public static final int PERM_READ = 0x1 << 2; + public static final int PERM_WRITE = 0x1 << 1; + public static final int PERM_INHERIT = 0x1 << 0; + + public static String perm2String(final int perm) { + final StringBuffer sb = new StringBuffer("---"); + if (isReadable(perm)) { + sb.replace(0, 1, "R"); + } + + if (isWriteable(perm)) { + sb.replace(1, 2, "W"); + } + + if (isInherited(perm)) { + sb.replace(2, 3, "X"); + } + + return sb.toString(); + } + + public static boolean isReadable(final int perm) { + return (perm & PERM_READ) == PERM_READ; + } + + public static boolean isWriteable(final int perm) { + return (perm & PERM_WRITE) == PERM_WRITE; + } + + public static boolean isInherited(final int perm) { + return (perm & PERM_INHERIT) == PERM_INHERIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java new file mode 100644 index 0000000..ededc90 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.consumer; + +/** + * + * @author shijia.wxr + */ +public enum ConsumeFromWhere { + CONSUME_FROM_LAST_OFFSET, + + @Deprecated + CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, + @Deprecated + CONSUME_FROM_MIN_OFFSET, + @Deprecated + CONSUME_FROM_MAX_OFFSET, + CONSUME_FROM_FIRST_OFFSET, + CONSUME_FROM_TIMESTAMP, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java new file mode 100644 index 0000000..2b26b83 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.filter; + +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.net.URL; + + +/** + * @author shijia.wxr + * + */ +public class FilterAPI { + public static URL classFile(final String className) { + final String javaSource = simpleClassName(className) + ".java"; + URL url = FilterAPI.class.getClassLoader().getResource(javaSource); + return url; + } + + public static String simpleClassName(final String className) { + String simple = className; + int index = className.lastIndexOf("."); + if (index >= 0) { + simple = className.substring(index + 1); + } + + return simple; + } + + public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, + String subString) throws Exception { + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTopic(topic); + subscriptionData.setSubString(subString); + + if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { + subscriptionData.setSubString(SubscriptionData.SUB_ALL); + } else { + String[] tags = subString.split("\\|\\|"); + if (tags != null && tags.length > 0) { + for (String tag : tags) { + if (tag.length() > 0) { + String trimString = tag.trim(); + if (trimString.length() > 0) { + subscriptionData.getTagsSet().add(trimString); + subscriptionData.getCodeSet().add(trimString.hashCode()); + } + } + } + } else { + throw new Exception("subString split error"); + } + } + + return subscriptionData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java new file mode 100644 index 0000000..50cc5fc --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter; + +public class FilterContext { + private String consumerGroup; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java new file mode 100644 index 0000000..8a1252e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter; + +import com.alibaba.rocketmq.common.message.MessageExt; + + +public interface MessageFilter { + boolean match(final MessageExt msg, final FilterContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java new file mode 100644 index 0000000..f83a5f5 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter.impl; + +public abstract class Op { + + private String symbol; + + + protected Op(String symbol) { + this.symbol = symbol; + } + + + public String getSymbol() { + return symbol; + } + + + public String toString() { + return symbol; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java new file mode 100644 index 0000000..95ca663 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter.impl; + +public class Operand extends Op { + + public Operand(String symbol) { + super(symbol); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java new file mode 100644 index 0000000..c906d72 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter.impl; + +public class Operator extends Op { + + public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false); + public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false); + public static final Operator AND = new Operator("&&", 20, true); + public static final Operator OR = new Operator("||", 15, true); + + private int priority; + private boolean compareable; + + + private Operator(String symbol, int priority, boolean compareable) { + super(symbol); + this.priority = priority; + this.compareable = compareable; + } + + public static Operator createOperator(String operator) { + if (LEFTPARENTHESIS.getSymbol().equals(operator)) + return LEFTPARENTHESIS; + else if (RIGHTPARENTHESIS.getSymbol().equals(operator)) + return RIGHTPARENTHESIS; + else if (AND.getSymbol().equals(operator)) + return AND; + else if (OR.getSymbol().equals(operator)) + return OR; + else + throw new IllegalArgumentException("unsupport operator " + operator); + } + + public int getPriority() { + return priority; + } + + public boolean isCompareable() { + return compareable; + } + + + public int compare(Operator operator) { + if (this.priority > operator.priority) + return 1; + else if (this.priority == operator.priority) + return 0; + else + return -1; + } + + public boolean isSpecifiedOp(String operator) { + return this.getSymbol().equals(operator); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java new file mode 100644 index 0000000..518c45e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import static com.alibaba.rocketmq.common.filter.impl.Operator.LEFTPARENTHESIS; +import static com.alibaba.rocketmq.common.filter.impl.Operator.RIGHTPARENTHESIS; +import static com.alibaba.rocketmq.common.filter.impl.Operator.createOperator; + +public class PolishExpr { + + public static List<Op> reversePolish(String expression) { + return reversePolish(participle(expression)); + } + + /** + * Shunting-yard algorithm <br/> + * http://en.wikipedia.org/wiki/Shunting_yard_algorithm + * + * @param tokens + * @return the compute result of Shunting-yard algorithm + */ + public static List<Op> reversePolish(List<Op> tokens) { + List<Op> segments = new ArrayList<Op>(); + Stack<Operator> operatorStack = new Stack<Operator>(); + + for (int i = 0; i < tokens.size(); i++) { + Op token = tokens.get(i); + if (isOperand(token)) { + + segments.add(token); + } else if (isLeftParenthesis(token)) { + + operatorStack.push((Operator) token); + } else if (isRightParenthesis(token)) { + + Operator opNew = null; + while (!operatorStack.empty() && LEFTPARENTHESIS != (opNew = operatorStack.pop())) { + segments.add(opNew); + } + if (null == opNew || LEFTPARENTHESIS != opNew) + throw new IllegalArgumentException("mismatched parentheses"); + } else if (isOperator(token)) { + + Operator opNew = (Operator) token; + if (!operatorStack.empty()) { + Operator opOld = operatorStack.peek(); + if (opOld.isCompareable() && opNew.compare(opOld) != 1) { + segments.add(operatorStack.pop()); + } + } + operatorStack.push(opNew); + } else + throw new IllegalArgumentException("illegal token " + token); + } + + while (!operatorStack.empty()) { + Operator operator = operatorStack.pop(); + if (LEFTPARENTHESIS == operator || RIGHTPARENTHESIS == operator) + throw new IllegalArgumentException("mismatched parentheses " + operator); + segments.add(operator); + } + + return segments; + } + + /** + * + * @param expression + * + * @return + * + * @throws Exception + */ + private static List<Op> participle(String expression) { + List<Op> segments = new ArrayList<Op>(); + + int size = expression.length(); + int wordStartIndex = -1; + int wordLen = 0; + Type preType = Type.NULL; + + for (int i = 0; i < size; i++) { + int chValue = (int) expression.charAt(i); + + if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90) + || (49 <= chValue && chValue <= 57) || 95 == chValue) { + + + if (Type.OPERATOR == preType || Type.SEPAERATOR == preType || Type.NULL == preType + || Type.PARENTHESIS == preType) { + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression.substring(wordStartIndex, wordStartIndex + + wordLen))); + } + wordStartIndex = i; + wordLen = 0; + } + preType = Type.OPERAND; + wordLen++; + } else if (40 == chValue || 41 == chValue) { + + + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression + .substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } else if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } + + preType = Type.PARENTHESIS; + segments.add(createOperator((char) chValue + "")); + } else if (38 == chValue || 124 == chValue) { + + if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) { + if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + + wordLen))); + } + wordStartIndex = i; + wordLen = 0; + } + preType = Type.OPERATOR; + wordLen++; + } else if (32 == chValue || 9 == chValue) { + + + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression + .substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } else if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } + preType = Type.SEPAERATOR; + } else { + + throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue); + } + + } + + if (wordLen > 0) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + } + return segments; + } + + public static boolean isOperand(Op token) { + return token instanceof Operand; + } + + public static boolean isLeftParenthesis(Op token) { + return token instanceof Operator && LEFTPARENTHESIS == (Operator) token; + } + + public static boolean isRightParenthesis(Op token) { + return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token; + } + + public static boolean isOperator(Op token) { + return token instanceof Operator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java new file mode 100644 index 0000000..1c0b343 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter.impl; + +public enum Type { + NULL, + OPERAND, + OPERATOR, + PARENTHESIS, + SEPAERATOR; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java new file mode 100644 index 0000000..06a74a6 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.help; + +/** + * @author shijia.wxr + */ +public class FAQUrl { + + public static final String APPLY_TOPIC_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist"; + + + public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist"; + + + public static final String GROUP_NAME_DUPLICATE_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&group_duplicate"; + + + public static final String CLIENT_PARAMETER_CHECK_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions¶meter_check_failed"; + + + public static final String SUBSCRIPTION_GROUP_NOT_EXIST = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subGroup_not_exist"; + + + public static final String CLIENT_SERVICE_NOT_OK = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok"; + + // FAQ: No route info of this topic, TopicABC + public static final String NO_TOPIC_ROUTE_INFO = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist"; + + + public static final String LOAD_JSON_EXCEPTION = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&load_json_exception"; + + + public static final String SAME_GROUP_DIFFERENT_TOPIC = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subscription_exception"; + + + public static final String MQLIST_NOT_EXIST = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&queue_not_exist"; + + public static final String UNEXPECTED_EXCEPTION_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception"; + + + public static final String SEND_MSG_FAILED = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&send_msg_failed"; + + + public static final String UNKNOWN_HOST_EXCEPTION = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unknown_host"; + + private static final String TIP_STRING_BEGIN = "\nSee "; + private static final String TIP_STRING_END = " for further details."; + + + public static String suggestTodo(final String url) { + StringBuilder sb = new StringBuilder(); + sb.append(TIP_STRING_BEGIN); + sb.append(url); + sb.append(TIP_STRING_END); + return sb.toString(); + } + + public static String attachDefaultURL(final String errorMessage) { + if (errorMessage != null) { + int index = errorMessage.indexOf(TIP_STRING_BEGIN); + if (-1 == index) { + StringBuilder sb = new StringBuilder(); + sb.append(errorMessage); + sb.append("\n"); + sb.append("For more information, please visit the url, "); + sb.append(UNEXPECTED_EXCEPTION_URL); + return sb.toString(); + } + } + + return errorMessage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java new file mode 100644 index 0000000..f5d9d7e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.hook; + +import java.nio.ByteBuffer; + + +/** + * + * @author manhong.yqd + * + */ +public interface FilterCheckHook { + public String hookName(); + + + public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java new file mode 100644 index 0000000..eeb6f52 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +/** + * + * @author shijia.wxr + * + */ +public class Message implements Serializable { + private static final long serialVersionUID = 8445773977080406428L; + + private String topic; + private int flag; + private Map<String, String> properties; + private byte[] body; + + + public Message() { + } + + + public Message(String topic, byte[] body) { + this(topic, "", "", 0, body, true); + } + + + public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { + this.topic = topic; + this.flag = flag; + this.body = body; + + if (tags != null && tags.length() > 0) + this.setTags(tags); + + if (keys != null && keys.length() > 0) + this.setKeys(keys); + + this.setWaitStoreMsgOK(waitStoreMsgOK); + } + + public void setKeys(String keys) { + this.putProperty(MessageConst.PROPERTY_KEYS, keys); + } + + void putProperty(final String name, final String value) { + if (null == this.properties) { + this.properties = new HashMap<String, String>(); + } + + this.properties.put(name, value); + } + + + public Message(String topic, String tags, byte[] body) { + this(topic, tags, "", 0, body, true); + } + + + public Message(String topic, String tags, String keys, byte[] body) { + this(topic, tags, keys, 0, body, true); + } + + void clearProperty(final String name) { + if (null != this.properties) { + this.properties.remove(name); + } + } + + public void putUserProperty(final String name, final String value) { + if (MessageConst.STRING_HASH_SET.contains(name)) { + throw new RuntimeException(String.format( + "The Property<%s> is used by system, input another please", name)); + } + this.putProperty(name, value); + } + + public String getUserProperty(final String name) { + return this.getProperty(name); + } + + public String getProperty(final String name) { + if (null == this.properties) { + this.properties = new HashMap<String, String>(); + } + + return this.properties.get(name); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTags() { + return this.getProperty(MessageConst.PROPERTY_TAGS); + } + + public void setTags(String tags) { + this.putProperty(MessageConst.PROPERTY_TAGS, tags); + } + + public String getKeys() { + return this.getProperty(MessageConst.PROPERTY_KEYS); + } + + public void setKeys(Collection<String> keys) { + StringBuffer sb = new StringBuffer(); + for (String k : keys) { + sb.append(k); + sb.append(MessageConst.KEY_SEPARATOR); + } + + this.setKeys(sb.toString().trim()); + } + + + public int getDelayTimeLevel() { + String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); + if (t != null) { + return Integer.parseInt(t); + } + + return 0; + } + + + public void setDelayTimeLevel(int level) { + this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); + } + + + public boolean isWaitStoreMsgOK() { + String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + if (null == result) + return true; + + return Boolean.parseBoolean(result); + } + + + public void setWaitStoreMsgOK(boolean waitStoreMsgOK) { + this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); + } + + + public int getFlag() { + return flag; + } + + + public void setFlag(int flag) { + this.flag = flag; + } + + + public byte[] getBody() { + return body; + } + + + public void setBody(byte[] body) { + this.body = body; + } + + + public Map<String, String> getProperties() { + return properties; + } + + + void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String getBuyerId() { + return getProperty(MessageConst.PROPERTY_BUYER_ID); + } + + public void setBuyerId(String buyerId) { + putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId); + } + + @Override + public String toString() { + return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body=" + + (body != null ? body.length : 0) + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java new file mode 100644 index 0000000..bbbca1a --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.message; + +import java.util.Map; + + +public class MessageAccessor { + + public static void clearProperty(final Message msg, final String name) { + msg.clearProperty(name); + } + + public static void setProperties(final Message msg, Map<String, String> properties) { + msg.setProperties(properties); + } + + public static void setTransferFlag(final Message msg, String unit) { + putProperty(msg, MessageConst.PROPERTY_TRANSFER_FLAG, unit); + } + + public static void putProperty(final Message msg, final String name, final String value) { + msg.putProperty(name, value); + } + + public static String getTransferFlag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_TRANSFER_FLAG); + } + + + public static void setCorrectionFlag(final Message msg, String unit) { + putProperty(msg, MessageConst.PROPERTY_CORRECTION_FLAG, unit); + } + + + public static String getCorrectionFlag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_CORRECTION_FLAG); + } + + + public static void setOriginMessageId(final Message msg, String originMessageId) { + putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, originMessageId); + } + + + public static String getOriginMessageId(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID); + } + + + public static void setMQ2Flag(final Message msg, String flag) { + putProperty(msg, MessageConst.PROPERTY_MQ2_FLAG, flag); + } + + + public static String getMQ2Flag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_MQ2_FLAG); + } + + + public static void setReconsumeTime(final Message msg, String reconsumeTimes) { + putProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME, reconsumeTimes); + } + + + public static String getReconsumeTime(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_RECONSUME_TIME); + } + + + public static void setMaxReconsumeTimes(final Message msg, String maxReconsumeTimes) { + putProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES, maxReconsumeTimes); + } + + + public static String getMaxReconsumeTimes(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES); + } + + public static void setConsumeStartTimeStamp(final Message msg, String propertyConsumeStartTimeStamp) { + putProperty(msg, MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, propertyConsumeStartTimeStamp); + } + + + public static String getConsumeStartTimeStamp(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java new file mode 100644 index 0000000..0ab372e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +public class MessageClientExt extends MessageExt { + + public void setOffsetMsgId(String offsetMsgId) { + super.setMsgId(offsetMsgId); + } + + + public String getOffsetMsgId() { + return super.getMsgId(); + } + + public void setMsgId(String msgId) { + //DO NOTHING + //MessageClientIDSetter.setUniqID(this); + } + + @Override + public String getMsgId() { + String uniqID = MessageClientIDSetter.getUniqID(this); + if (uniqID == null) { + return this.getOffsetMsgId(); + } + else { + return uniqID; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java new file mode 100644 index 0000000..82cd3d1 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import com.alibaba.rocketmq.common.UtilAll; + +import java.nio.ByteBuffer; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.atomic.AtomicInteger; + +public class MessageClientIDSetter { + private static final String TOPIC_KEY_SPLITTER = "#"; + private static final int LEN; + private static final String FIX_STRING; + private static final AtomicInteger COUNTER; + private static long startTime; + private static long nextStartTime; + + static { + LEN = 4 + 2 + 4 + 4 + 2; + ByteBuffer tempBuffer = ByteBuffer.allocate(10); + tempBuffer.position(2); + tempBuffer.putInt(UtilAll.getPid()); + tempBuffer.position(0); + try { + tempBuffer.put(UtilAll.getIP()); + } catch (Exception e) { + tempBuffer.put(createFakeIP()); + } + tempBuffer.position(6); + tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4 + FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); + setStartTime(System.currentTimeMillis()); + COUNTER = new AtomicInteger(0); + } + + private synchronized static void setStartTime(long millis) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(millis); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + startTime = cal.getTimeInMillis(); + cal.add(Calendar.MONTH, 1); + nextStartTime = cal.getTimeInMillis(); + } + + public static Date getNearlyTimeFromID(String msgID) { + ByteBuffer buf = ByteBuffer.allocate(8); + byte[] bytes = UtilAll.string2bytes(msgID); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put(bytes, 10, 4); + buf.position(0); + long spanMS = buf.getLong(); + Calendar cal = Calendar.getInstance(); + long now = cal.getTimeInMillis(); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + long monStartTime = cal.getTimeInMillis(); + if (monStartTime + spanMS >= now) { + cal.add(Calendar.MONTH, -1); + monStartTime = cal.getTimeInMillis(); + } + cal.setTimeInMillis(monStartTime + spanMS); + return cal.getTime(); + } + + public static String getIPStrFromID(String msgID) { + byte[] ipBytes = getIPFromID(msgID); + return UtilAll.ipToIPv4Str(ipBytes); + } + + public static byte[] getIPFromID(String msgID) { + byte[] result = new byte[4]; + byte[] bytes = UtilAll.string2bytes(msgID); + System.arraycopy(bytes, 0, result, 0, 4); + return result; + } + + public static String createUniqID() { + StringBuilder sb = new StringBuilder(LEN * 2); + sb.append(FIX_STRING); + sb.append(UtilAll.bytes2string(createUniqIDBuffer())); + return sb.toString(); + } + + + private static byte[] createUniqIDBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(4 + 2); + long current = System.currentTimeMillis(); + if (current >= nextStartTime) { + setStartTime(current); + } + buffer.position(0); + buffer.putInt((int) (System.currentTimeMillis() - startTime)); + buffer.putShort((short) COUNTER.getAndIncrement()); + return buffer.array(); + } + + public static void setUniqID(final Message msg) { + if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { + msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID()); + } + } + + public static String getUniqID(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + } + + public static byte[] createFakeIP() { + ByteBuffer bb = ByteBuffer.allocate(8); + bb.putLong(System.currentTimeMillis()); + bb.position(4); + byte[] fakeIP = new byte[4]; + bb.get(fakeIP); + return fakeIP; + } +} +
