http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java new file mode 100644 index 0000000..1405299 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.admin; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + + +/** + * + * @author shijia.wxr + * + */ +public class ConsumeStats extends RemotingSerializable { + private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>(); + private double consumeTps = 0; + + + public long computeTotalDiff() { + long diffTotal = 0L; + + Iterator<Entry<MessageQueue, OffsetWrapper>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, OffsetWrapper> next = it.next(); + long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset(); + diffTotal += diff; + } + + return diffTotal; + } + + + public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> offsetTable) { + this.offsetTable = offsetTable; + } + + public double getConsumeTps() { + return consumeTps; + } + + public void setConsumeTps(double consumeTps) { + this.consumeTps = consumeTps; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java b/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java new file mode 100644 index 0000000..00bab0e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.admin; + +/** + * + * @author shijia.wxr + * + */ +public class OffsetWrapper { + private long brokerOffset; + private long consumerOffset; + + private long lastTimestamp; + + + public long getBrokerOffset() { + return brokerOffset; + } + + + public void setBrokerOffset(long brokerOffset) { + this.brokerOffset = brokerOffset; + } + + + public long getConsumerOffset() { + return consumerOffset; + } + + + public void setConsumerOffset(long consumerOffset) { + this.consumerOffset = consumerOffset; + } + + + public long getLastTimestamp() { + return lastTimestamp; + } + + + public void setLastTimestamp(long lastTimestamp) { + this.lastTimestamp = lastTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java new file mode 100644 index 0000000..5709327 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.admin; + +/** + * + * @author manhong.yqd + */ +public class RollbackStats { + private String brokerName; + private long queueId; + private long brokerOffset; + private long consumerOffset; + private long timestampOffset; + private long rollbackOffset; + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public long getQueueId() { + return queueId; + } + + + public void setQueueId(long queueId) { + this.queueId = queueId; + } + + + public long getBrokerOffset() { + return brokerOffset; + } + + + public void setBrokerOffset(long brokerOffset) { + this.brokerOffset = brokerOffset; + } + + + public long getConsumerOffset() { + return consumerOffset; + } + + + public void setConsumerOffset(long consumerOffset) { + this.consumerOffset = consumerOffset; + } + + + public long getTimestampOffset() { + return timestampOffset; + } + + + public void setTimestampOffset(long timestampOffset) { + this.timestampOffset = timestampOffset; + } + + + public long getRollbackOffset() { + return rollbackOffset; + } + + + public void setRollbackOffset(long rollbackOffset) { + this.rollbackOffset = rollbackOffset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java new file mode 100644 index 0000000..d1b36a5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.admin; + +/** + * + * @author shijia.wxr + * + */ +public class TopicOffset { + private long minOffset; + private long maxOffset; + private long lastUpdateTimestamp; + + + public long getMinOffset() { + return minOffset; + } + + + public void setMinOffset(long minOffset) { + this.minOffset = minOffset; + } + + + public long getMaxOffset() { + return maxOffset; + } + + + public void setMaxOffset(long maxOffset) { + this.maxOffset = maxOffset; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java new file mode 100644 index 0000000..d8f7e0a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.admin; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; + + +/** + * + * @author shijia.wxr + * + */ +public class TopicStatsTable extends RemotingSerializable { + private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>(); + + + public HashMap<MessageQueue, TopicOffset> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java b/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java new file mode 100644 index 0000000..952e08e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE}) +public @interface ImportantField { +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java new file mode 100644 index 0000000..298a427 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.constant; + +public class DBMsgConstants { + public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java new file mode 100644 index 0000000..1942dc8 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.constant; + +/** + * @author shijia.wxr + */ +public class LoggerName { + public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv"; + public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv"; + public static final String BROKER_LOGGER_NAME = "RocketmqBroker"; + public static final String CLIENT_LOGGER_NAME = "RocketmqClient"; + public static final String TOOLS_LOGGER_NAME = "RocketmqTools"; + public static final String COMMON_LOGGER_NAME = "RocketmqCommon"; + public static final String STORE_LOGGER_NAME = "RocketmqStore"; + public static final String STORE_ERROR_LOGGER_NAME = "RocketmqStoreError"; + public static final String TRANSACTION_LOGGER_NAME = "RocketmqTransaction"; + public static final String REBALANCE_LOCK_LOGGER_NAME = "RocketmqRebalanceLock"; + public static final String ROCKETMQ_STATS_LOGGER_NAME = "RocketmqStats"; + public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial"; + public static final String FLOW_CONTROL_LOGGER_NAME = "RocketmqFlowControl"; + public static final String ROCKETMQ_AUTHORIZE_LOGGER_NAME = "RocketmqAuthorize"; + public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication"; + public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection"; + public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java new file mode 100644 index 0000000..ed379ec --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.constant; + +/** + * @author shijia.wxr + */ +public class PermName { + public static final int PERM_PRIORITY = 0x1 << 3; + public static final int PERM_READ = 0x1 << 2; + public static final int PERM_WRITE = 0x1 << 1; + public static final int PERM_INHERIT = 0x1 << 0; + + public static String perm2String(final int perm) { + final StringBuffer sb = new StringBuffer("---"); + if (isReadable(perm)) { + sb.replace(0, 1, "R"); + } + + if (isWriteable(perm)) { + sb.replace(1, 2, "W"); + } + + if (isInherited(perm)) { + sb.replace(2, 3, "X"); + } + + return sb.toString(); + } + + public static boolean isReadable(final int perm) { + return (perm & PERM_READ) == PERM_READ; + } + + public static boolean isWriteable(final int perm) { + return (perm & PERM_WRITE) == PERM_WRITE; + } + + public static boolean isInherited(final int perm) { + return (perm & PERM_INHERIT) == PERM_INHERIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java b/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java new file mode 100644 index 0000000..db093a0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.consumer; + +/** + * + * @author shijia.wxr + */ +public enum ConsumeFromWhere { + CONSUME_FROM_LAST_OFFSET, + + @Deprecated + CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, + @Deprecated + CONSUME_FROM_MIN_OFFSET, + @Deprecated + CONSUME_FROM_MAX_OFFSET, + CONSUME_FROM_FIRST_OFFSET, + CONSUME_FROM_TIMESTAMP, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java new file mode 100644 index 0000000..fac48ea --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.filter; + +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.net.URL; + + +/** + * @author shijia.wxr + * + */ +public class FilterAPI { + public static URL classFile(final String className) { + final String javaSource = simpleClassName(className) + ".java"; + URL url = FilterAPI.class.getClassLoader().getResource(javaSource); + return url; + } + + public static String simpleClassName(final String className) { + String simple = className; + int index = className.lastIndexOf("."); + if (index >= 0) { + simple = className.substring(index + 1); + } + + return simple; + } + + public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, + String subString) throws Exception { + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTopic(topic); + subscriptionData.setSubString(subString); + + if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { + subscriptionData.setSubString(SubscriptionData.SUB_ALL); + } else { + String[] tags = subString.split("\\|\\|"); + if (tags != null && tags.length > 0) { + for (String tag : tags) { + if (tag.length() > 0) { + String trimString = tag.trim(); + if (trimString.length() > 0) { + subscriptionData.getTagsSet().add(trimString); + subscriptionData.getCodeSet().add(trimString.hashCode()); + } + } + } + } else { + throw new Exception("subString split error"); + } + } + + return subscriptionData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java new file mode 100644 index 0000000..e18fe48 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter; + +public class FilterContext { + private String consumerGroup; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java b/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java new file mode 100644 index 0000000..c20e737 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter; + +import org.apache.rocketmq.common.message.MessageExt; + + +public interface MessageFilter { + boolean match(final MessageExt msg, final FilterContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java new file mode 100644 index 0000000..af54566 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter.impl; + +public abstract class Op { + + private String symbol; + + + protected Op(String symbol) { + this.symbol = symbol; + } + + + public String getSymbol() { + return symbol; + } + + + public String toString() { + return symbol; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java new file mode 100644 index 0000000..ce21d90 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter.impl; + +public class Operand extends Op { + + public Operand(String symbol) { + super(symbol); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java new file mode 100644 index 0000000..45bebf0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter.impl; + +public class Operator extends Op { + + public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false); + public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false); + public static final Operator AND = new Operator("&&", 20, true); + public static final Operator OR = new Operator("||", 15, true); + + private int priority; + private boolean compareable; + + + private Operator(String symbol, int priority, boolean compareable) { + super(symbol); + this.priority = priority; + this.compareable = compareable; + } + + public static Operator createOperator(String operator) { + if (LEFTPARENTHESIS.getSymbol().equals(operator)) + return LEFTPARENTHESIS; + else if (RIGHTPARENTHESIS.getSymbol().equals(operator)) + return RIGHTPARENTHESIS; + else if (AND.getSymbol().equals(operator)) + return AND; + else if (OR.getSymbol().equals(operator)) + return OR; + else + throw new IllegalArgumentException("unsupport operator " + operator); + } + + public int getPriority() { + return priority; + } + + public boolean isCompareable() { + return compareable; + } + + + public int compare(Operator operator) { + if (this.priority > operator.priority) + return 1; + else if (this.priority == operator.priority) + return 0; + else + return -1; + } + + public boolean isSpecifiedOp(String operator) { + return this.getSymbol().equals(operator); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java new file mode 100644 index 0000000..73b51b6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import static org.apache.rocketmq.common.filter.impl.Operator.LEFTPARENTHESIS; +import static org.apache.rocketmq.common.filter.impl.Operator.RIGHTPARENTHESIS; +import static org.apache.rocketmq.common.filter.impl.Operator.createOperator; + +public class PolishExpr { + + public static List<Op> reversePolish(String expression) { + return reversePolish(participle(expression)); + } + + /** + * Shunting-yard algorithm <br/> + * http://en.wikipedia.org/wiki/Shunting_yard_algorithm + * + * @param tokens + * @return the compute result of Shunting-yard algorithm + */ + public static List<Op> reversePolish(List<Op> tokens) { + List<Op> segments = new ArrayList<Op>(); + Stack<Operator> operatorStack = new Stack<Operator>(); + + for (int i = 0; i < tokens.size(); i++) { + Op token = tokens.get(i); + if (isOperand(token)) { + + segments.add(token); + } else if (isLeftParenthesis(token)) { + + operatorStack.push((Operator) token); + } else if (isRightParenthesis(token)) { + + Operator opNew = null; + while (!operatorStack.empty() && LEFTPARENTHESIS != (opNew = operatorStack.pop())) { + segments.add(opNew); + } + if (null == opNew || LEFTPARENTHESIS != opNew) + throw new IllegalArgumentException("mismatched parentheses"); + } else if (isOperator(token)) { + + Operator opNew = (Operator) token; + if (!operatorStack.empty()) { + Operator opOld = operatorStack.peek(); + if (opOld.isCompareable() && opNew.compare(opOld) != 1) { + segments.add(operatorStack.pop()); + } + } + operatorStack.push(opNew); + } else + throw new IllegalArgumentException("illegal token " + token); + } + + while (!operatorStack.empty()) { + Operator operator = operatorStack.pop(); + if (LEFTPARENTHESIS == operator || RIGHTPARENTHESIS == operator) + throw new IllegalArgumentException("mismatched parentheses " + operator); + segments.add(operator); + } + + return segments; + } + + /** + * + * @param expression + * + * @return + * + * @throws Exception + */ + private static List<Op> participle(String expression) { + List<Op> segments = new ArrayList<Op>(); + + int size = expression.length(); + int wordStartIndex = -1; + int wordLen = 0; + Type preType = Type.NULL; + + for (int i = 0; i < size; i++) { + int chValue = (int) expression.charAt(i); + + if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90) + || (49 <= chValue && chValue <= 57) || 95 == chValue) { + + + if (Type.OPERATOR == preType || Type.SEPAERATOR == preType || Type.NULL == preType + || Type.PARENTHESIS == preType) { + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression.substring(wordStartIndex, wordStartIndex + + wordLen))); + } + wordStartIndex = i; + wordLen = 0; + } + preType = Type.OPERAND; + wordLen++; + } else if (40 == chValue || 41 == chValue) { + + + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression + .substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } else if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } + + preType = Type.PARENTHESIS; + segments.add(createOperator((char) chValue + "")); + } else if (38 == chValue || 124 == chValue) { + + if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) { + if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + + wordLen))); + } + wordStartIndex = i; + wordLen = 0; + } + preType = Type.OPERATOR; + wordLen++; + } else if (32 == chValue || 9 == chValue) { + + + if (Type.OPERATOR == preType) { + segments.add(createOperator(expression + .substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } else if (Type.OPERAND == preType) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + wordStartIndex = -1; + wordLen = 0; + } + preType = Type.SEPAERATOR; + } else { + + throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue); + } + + } + + if (wordLen > 0) { + segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen))); + } + return segments; + } + + public static boolean isOperand(Op token) { + return token instanceof Operand; + } + + public static boolean isLeftParenthesis(Op token) { + return token instanceof Operator && LEFTPARENTHESIS == (Operator) token; + } + + public static boolean isRightParenthesis(Op token) { + return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token; + } + + public static boolean isOperator(Op token) { + return token instanceof Operator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java new file mode 100644 index 0000000..834bde8 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter.impl; + +public enum Type { + NULL, + OPERAND, + OPERATOR, + PARENTHESIS, + SEPAERATOR; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java new file mode 100644 index 0000000..85bef76 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.help; + +/** + * @author shijia.wxr + */ +public class FAQUrl { + + public static final String APPLY_TOPIC_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist"; + + + public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist"; + + + public static final String GROUP_NAME_DUPLICATE_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&group_duplicate"; + + + public static final String CLIENT_PARAMETER_CHECK_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions¶meter_check_failed"; + + + public static final String SUBSCRIPTION_GROUP_NOT_EXIST = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subGroup_not_exist"; + + + public static final String CLIENT_SERVICE_NOT_OK = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok"; + + // FAQ: No route info of this topic, TopicABC + public static final String NO_TOPIC_ROUTE_INFO = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist"; + + + public static final String LOAD_JSON_EXCEPTION = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&load_json_exception"; + + + public static final String SAME_GROUP_DIFFERENT_TOPIC = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subscription_exception"; + + + public static final String MQLIST_NOT_EXIST = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&queue_not_exist"; + + public static final String UNEXPECTED_EXCEPTION_URL = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception"; + + + public static final String SEND_MSG_FAILED = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&send_msg_failed"; + + + public static final String UNKNOWN_HOST_EXCEPTION = // + "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unknown_host"; + + private static final String TIP_STRING_BEGIN = "\nSee "; + private static final String TIP_STRING_END = " for further details."; + + + public static String suggestTodo(final String url) { + StringBuilder sb = new StringBuilder(); + sb.append(TIP_STRING_BEGIN); + sb.append(url); + sb.append(TIP_STRING_END); + return sb.toString(); + } + + public static String attachDefaultURL(final String errorMessage) { + if (errorMessage != null) { + int index = errorMessage.indexOf(TIP_STRING_BEGIN); + if (-1 == index) { + StringBuilder sb = new StringBuilder(); + sb.append(errorMessage); + sb.append("\n"); + sb.append("For more information, please visit the url, "); + sb.append(UNEXPECTED_EXCEPTION_URL); + return sb.toString(); + } + } + + return errorMessage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java b/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java new file mode 100644 index 0000000..a99df6e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.hook; + +import java.nio.ByteBuffer; + + +/** + * + * @author manhong.yqd + * + */ +public interface FilterCheckHook { + public String hookName(); + + + public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/Message.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java new file mode 100644 index 0000000..c2d2d85 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +/** + * + * @author shijia.wxr + * + */ +public class Message implements Serializable { + private static final long serialVersionUID = 8445773977080406428L; + + private String topic; + private int flag; + private Map<String, String> properties; + private byte[] body; + + + public Message() { + } + + + public Message(String topic, byte[] body) { + this(topic, "", "", 0, body, true); + } + + + public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { + this.topic = topic; + this.flag = flag; + this.body = body; + + if (tags != null && tags.length() > 0) + this.setTags(tags); + + if (keys != null && keys.length() > 0) + this.setKeys(keys); + + this.setWaitStoreMsgOK(waitStoreMsgOK); + } + + public void setKeys(String keys) { + this.putProperty(MessageConst.PROPERTY_KEYS, keys); + } + + void putProperty(final String name, final String value) { + if (null == this.properties) { + this.properties = new HashMap<String, String>(); + } + + this.properties.put(name, value); + } + + + public Message(String topic, String tags, byte[] body) { + this(topic, tags, "", 0, body, true); + } + + + public Message(String topic, String tags, String keys, byte[] body) { + this(topic, tags, keys, 0, body, true); + } + + void clearProperty(final String name) { + if (null != this.properties) { + this.properties.remove(name); + } + } + + public void putUserProperty(final String name, final String value) { + if (MessageConst.STRING_HASH_SET.contains(name)) { + throw new RuntimeException(String.format( + "The Property<%s> is used by system, input another please", name)); + } + this.putProperty(name, value); + } + + public String getUserProperty(final String name) { + return this.getProperty(name); + } + + public String getProperty(final String name) { + if (null == this.properties) { + this.properties = new HashMap<String, String>(); + } + + return this.properties.get(name); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTags() { + return this.getProperty(MessageConst.PROPERTY_TAGS); + } + + public void setTags(String tags) { + this.putProperty(MessageConst.PROPERTY_TAGS, tags); + } + + public String getKeys() { + return this.getProperty(MessageConst.PROPERTY_KEYS); + } + + public void setKeys(Collection<String> keys) { + StringBuffer sb = new StringBuffer(); + for (String k : keys) { + sb.append(k); + sb.append(MessageConst.KEY_SEPARATOR); + } + + this.setKeys(sb.toString().trim()); + } + + + public int getDelayTimeLevel() { + String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); + if (t != null) { + return Integer.parseInt(t); + } + + return 0; + } + + + public void setDelayTimeLevel(int level) { + this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); + } + + + public boolean isWaitStoreMsgOK() { + String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + if (null == result) + return true; + + return Boolean.parseBoolean(result); + } + + + public void setWaitStoreMsgOK(boolean waitStoreMsgOK) { + this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); + } + + + public int getFlag() { + return flag; + } + + + public void setFlag(int flag) { + this.flag = flag; + } + + + public byte[] getBody() { + return body; + } + + + public void setBody(byte[] body) { + this.body = body; + } + + + public Map<String, String> getProperties() { + return properties; + } + + + void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String getBuyerId() { + return getProperty(MessageConst.PROPERTY_BUYER_ID); + } + + public void setBuyerId(String buyerId) { + putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId); + } + + @Override + public String toString() { + return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body=" + + (body != null ? body.length : 0) + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java new file mode 100644 index 0000000..5cd0ba8 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +import java.util.Map; + + +public class MessageAccessor { + + public static void clearProperty(final Message msg, final String name) { + msg.clearProperty(name); + } + + public static void setProperties(final Message msg, Map<String, String> properties) { + msg.setProperties(properties); + } + + public static void setTransferFlag(final Message msg, String unit) { + putProperty(msg, MessageConst.PROPERTY_TRANSFER_FLAG, unit); + } + + public static void putProperty(final Message msg, final String name, final String value) { + msg.putProperty(name, value); + } + + public static String getTransferFlag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_TRANSFER_FLAG); + } + + + public static void setCorrectionFlag(final Message msg, String unit) { + putProperty(msg, MessageConst.PROPERTY_CORRECTION_FLAG, unit); + } + + + public static String getCorrectionFlag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_CORRECTION_FLAG); + } + + + public static void setOriginMessageId(final Message msg, String originMessageId) { + putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, originMessageId); + } + + + public static String getOriginMessageId(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID); + } + + + public static void setMQ2Flag(final Message msg, String flag) { + putProperty(msg, MessageConst.PROPERTY_MQ2_FLAG, flag); + } + + + public static String getMQ2Flag(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_MQ2_FLAG); + } + + + public static void setReconsumeTime(final Message msg, String reconsumeTimes) { + putProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME, reconsumeTimes); + } + + + public static String getReconsumeTime(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_RECONSUME_TIME); + } + + + public static void setMaxReconsumeTimes(final Message msg, String maxReconsumeTimes) { + putProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES, maxReconsumeTimes); + } + + + public static String getMaxReconsumeTimes(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES); + } + + public static void setConsumeStartTimeStamp(final Message msg, String propertyConsumeStartTimeStamp) { + putProperty(msg, MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, propertyConsumeStartTimeStamp); + } + + + public static String getConsumeStartTimeStamp(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java new file mode 100644 index 0000000..90703ca --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +public class MessageClientExt extends MessageExt { + + public void setOffsetMsgId(String offsetMsgId) { + super.setMsgId(offsetMsgId); + } + + + public String getOffsetMsgId() { + return super.getMsgId(); + } + + public void setMsgId(String msgId) { + //DO NOTHING + //MessageClientIDSetter.setUniqID(this); + } + + @Override + public String getMsgId() { + String uniqID = MessageClientIDSetter.getUniqID(this); + if (uniqID == null) { + return this.getOffsetMsgId(); + } + else { + return uniqID; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java new file mode 100644 index 0000000..1c3a1b7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import org.apache.rocketmq.common.UtilAll; + +import java.nio.ByteBuffer; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.atomic.AtomicInteger; + +public class MessageClientIDSetter { + private static final String TOPIC_KEY_SPLITTER = "#"; + private static final int LEN; + private static final String FIX_STRING; + private static final AtomicInteger COUNTER; + private static long startTime; + private static long nextStartTime; + + static { + LEN = 4 + 2 + 4 + 4 + 2; + ByteBuffer tempBuffer = ByteBuffer.allocate(10); + tempBuffer.position(2); + tempBuffer.putInt(UtilAll.getPid()); + tempBuffer.position(0); + try { + tempBuffer.put(UtilAll.getIP()); + } catch (Exception e) { + tempBuffer.put(createFakeIP()); + } + tempBuffer.position(6); + tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4 + FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); + setStartTime(System.currentTimeMillis()); + COUNTER = new AtomicInteger(0); + } + + private synchronized static void setStartTime(long millis) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(millis); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + startTime = cal.getTimeInMillis(); + cal.add(Calendar.MONTH, 1); + nextStartTime = cal.getTimeInMillis(); + } + + public static Date getNearlyTimeFromID(String msgID) { + ByteBuffer buf = ByteBuffer.allocate(8); + byte[] bytes = UtilAll.string2bytes(msgID); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put((byte) 0); + buf.put(bytes, 10, 4); + buf.position(0); + long spanMS = buf.getLong(); + Calendar cal = Calendar.getInstance(); + long now = cal.getTimeInMillis(); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + long monStartTime = cal.getTimeInMillis(); + if (monStartTime + spanMS >= now) { + cal.add(Calendar.MONTH, -1); + monStartTime = cal.getTimeInMillis(); + } + cal.setTimeInMillis(monStartTime + spanMS); + return cal.getTime(); + } + + public static String getIPStrFromID(String msgID) { + byte[] ipBytes = getIPFromID(msgID); + return UtilAll.ipToIPv4Str(ipBytes); + } + + public static byte[] getIPFromID(String msgID) { + byte[] result = new byte[4]; + byte[] bytes = UtilAll.string2bytes(msgID); + System.arraycopy(bytes, 0, result, 0, 4); + return result; + } + + public static String createUniqID() { + StringBuilder sb = new StringBuilder(LEN * 2); + sb.append(FIX_STRING); + sb.append(UtilAll.bytes2string(createUniqIDBuffer())); + return sb.toString(); + } + + + private static byte[] createUniqIDBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(4 + 2); + long current = System.currentTimeMillis(); + if (current >= nextStartTime) { + setStartTime(current); + } + buffer.position(0); + buffer.putInt((int) (System.currentTimeMillis() - startTime)); + buffer.putShort((short) COUNTER.getAndIncrement()); + return buffer.array(); + } + + public static void setUniqID(final Message msg) { + if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { + msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID()); + } + } + + public static String getUniqID(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + } + + public static byte[] createFakeIP() { + ByteBuffer bb = ByteBuffer.allocate(8); + bb.putLong(System.currentTimeMillis()); + bb.position(4); + byte[] fakeIP = new byte[4]; + bb.get(fakeIP); + return fakeIP; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java new file mode 100644 index 0000000..d65160b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.util.HashSet; + + +public class MessageConst { + public static final String PROPERTY_KEYS = "KEYS"; + public static final String PROPERTY_TAGS = "TAGS"; + public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; + public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; + public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC"; + public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID"; + public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; + public static final String PROPERTY_PRODUCER_GROUP = "PGROUP"; + public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET"; + public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET"; + public static final String PROPERTY_BUYER_ID = "BUYER_ID"; + public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; + public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; + public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; + public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG"; + public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; + public static final String PROPERTY_MSG_REGION = "MSG_REGION"; + public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON"; + public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; + public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; + public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; + + public static final String KEY_SEPARATOR = " "; + + public static final HashSet<String> STRING_HASH_SET = new HashSet<String>(); + + + static { + STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH); + STRING_HASH_SET.add(PROPERTY_MSG_REGION); + STRING_HASH_SET.add(PROPERTY_KEYS); + STRING_HASH_SET.add(PROPERTY_TAGS); + STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK); + STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL); + STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC); + STRING_HASH_SET.add(PROPERTY_REAL_TOPIC); + STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID); + STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED); + STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP); + STRING_HASH_SET.add(PROPERTY_MIN_OFFSET); + STRING_HASH_SET.add(PROPERTY_MAX_OFFSET); + STRING_HASH_SET.add(PROPERTY_BUYER_ID); + STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID); + STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG); + STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG); + STRING_HASH_SET.add(PROPERTY_MQ2_FLAG); + STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME); + STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); + STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java new file mode 100644 index 0000000..4410171 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * @author shijia.wxr + */ +public class MessageDecoder { + public final static int MSG_ID_LENGTH = 8 + 8; + + public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + public final static int MESSAGE_MAGIC_CODE_POSTION = 4; + public final static int MESSAGE_FLAG_POSTION = 16; + public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28; + public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56; + public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; + + + public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) { + input.flip(); + input.limit(MessageDecoder.MSG_ID_LENGTH); + + input.put(addr); + input.putLong(offset); + + return UtilAll.bytes2string(input.array()); + } + + + public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) { + ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + byteBuffer.put(inetSocketAddress.getAddress().getAddress()); + byteBuffer.putInt(inetSocketAddress.getPort()); + byteBuffer.putLong(transactionIdhashCode); + byteBuffer.flip(); + return UtilAll.bytes2string(byteBuffer.array()); + } + + + public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { + SocketAddress address; + long offset; + + + byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8)); + byte[] port = UtilAll.string2bytes(msgId.substring(8, 16)); + ByteBuffer bb = ByteBuffer.wrap(port); + int portInt = bb.getInt(0); + address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); + + // offset + byte[] data = UtilAll.string2bytes(msgId.substring(16, 32)); + bb = ByteBuffer.wrap(data); + offset = bb.getLong(0); + + return new MessageId(address, offset); + } + + + public static MessageExt decode(java.nio.ByteBuffer byteBuffer) { + return decode(byteBuffer, true, true, false); + } + + public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + return decode(byteBuffer, readBody, true, true); + } + + public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + return decode(byteBuffer, readBody, true, false); + } + + + public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception { + byte[] body = messageExt.getBody(); + byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8); + byte topicLen = (byte) topics.length; + String properties = messageProperties2String(messageExt.getProperties()); + byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); + short propertiesLength = (short) propertiesBytes.length; + int sysFlag = messageExt.getSysFlag(); + byte[] newBody = messageExt.getBody(); + if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { + newBody = UtilAll.compress(body, 5); + } + int bodyLength = newBody.length; + int storeSize = messageExt.getStoreSize(); + ByteBuffer byteBuffer; + if (storeSize > 0) { + byteBuffer = ByteBuffer.allocate(storeSize); + } else { + storeSize = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + bodyLength // 14 BODY + + 1 + topicLen // 15 TOPIC + + 2 + propertiesLength // 16 propertiesLength + + 0; + byteBuffer = ByteBuffer.allocate(storeSize); + } + // 1 TOTALSIZE + byteBuffer.putInt(storeSize); + + // 2 MAGICCODE + byteBuffer.putInt(MESSAGE_MAGIC_CODE); + + // 3 BODYCRC + int bodyCRC = messageExt.getBodyCRC(); + byteBuffer.putInt(bodyCRC); + + // 4 QUEUEID + int queueId = messageExt.getQueueId(); + byteBuffer.putInt(queueId); + + // 5 FLAG + int flag = messageExt.getFlag(); + byteBuffer.putInt(flag); + + // 6 QUEUEOFFSET + long queueOffset = messageExt.getQueueOffset(); + byteBuffer.putLong(queueOffset); + + // 7 PHYSICALOFFSET + long physicOffset = messageExt.getCommitLogOffset(); + byteBuffer.putLong(physicOffset); + + // 8 SYSFLAG + byteBuffer.putInt(sysFlag); + + // 9 BORNTIMESTAMP + long bornTimeStamp = messageExt.getBornTimestamp(); + byteBuffer.putLong(bornTimeStamp); + + // 10 BORNHOST + InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost(); + byteBuffer.put(bornHost.getAddress().getAddress()); + byteBuffer.putInt(bornHost.getPort()); + + // 11 STORETIMESTAMP + long storeTimestamp = messageExt.getStoreTimestamp(); + byteBuffer.putLong(storeTimestamp); + + // 12 STOREHOST + InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost(); + byteBuffer.put(serverHost.getAddress().getAddress()); + byteBuffer.putInt(serverHost.getPort()); + + // 13 RECONSUMETIMES + int reconsumeTimes = messageExt.getReconsumeTimes(); + byteBuffer.putInt(reconsumeTimes); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = messageExt.getPreparedTransactionOffset(); + byteBuffer.putLong(preparedTransactionOffset); + + // 15 BODY + byteBuffer.putInt(bodyLength); + byteBuffer.put(newBody); + + // 16 TOPIC + byteBuffer.put(topicLen); + byteBuffer.put(topics); + + // 17 properties + byteBuffer.putShort(propertiesLength); + byteBuffer.put(propertiesBytes); + + return byteBuffer.array(); + } + + public static MessageExt decode( + java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { + return decode(byteBuffer, readBody, deCompressBody, false); + } + + public static MessageExt decode( + java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) { + try { + + MessageExt msgExt; + if (isClient) { + msgExt = new MessageClientExt(); + } else { + msgExt = new MessageExt(); + } + + // 1 TOTALSIZE + int storeSize = byteBuffer.getInt(); + msgExt.setStoreSize(storeSize); + + // 2 MAGICCODE + byteBuffer.getInt(); + + // 3 BODYCRC + int bodyCRC = byteBuffer.getInt(); + msgExt.setBodyCRC(bodyCRC); + + // 4 QUEUEID + int queueId = byteBuffer.getInt(); + msgExt.setQueueId(queueId); + + // 5 FLAG + int flag = byteBuffer.getInt(); + msgExt.setFlag(flag); + + // 6 QUEUEOFFSET + long queueOffset = byteBuffer.getLong(); + msgExt.setQueueOffset(queueOffset); + + // 7 PHYSICALOFFSET + long physicOffset = byteBuffer.getLong(); + msgExt.setCommitLogOffset(physicOffset); + + // 8 SYSFLAG + int sysFlag = byteBuffer.getInt(); + msgExt.setSysFlag(sysFlag); + + // 9 BORNTIMESTAMP + long bornTimeStamp = byteBuffer.getLong(); + msgExt.setBornTimestamp(bornTimeStamp); + + // 10 BORNHOST + byte[] bornHost = new byte[4]; + byteBuffer.get(bornHost, 0, 4); + int port = byteBuffer.getInt(); + msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port)); + + // 11 STORETIMESTAMP + long storeTimestamp = byteBuffer.getLong(); + msgExt.setStoreTimestamp(storeTimestamp); + + // 12 STOREHOST + byte[] storeHost = new byte[4]; + byteBuffer.get(storeHost, 0, 4); + port = byteBuffer.getInt(); + msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port)); + + // 13 RECONSUMETIMES + int reconsumeTimes = byteBuffer.getInt(); + msgExt.setReconsumeTimes(reconsumeTimes); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = byteBuffer.getLong(); + msgExt.setPreparedTransactionOffset(preparedTransactionOffset); + + // 15 BODY + int bodyLen = byteBuffer.getInt(); + if (bodyLen > 0) { + if (readBody) { + byte[] body = new byte[bodyLen]; + byteBuffer.get(body); + + // uncompress body + if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { + body = UtilAll.uncompress(body); + } + + msgExt.setBody(body); + } else { + byteBuffer.position(byteBuffer.position() + bodyLen); + } + } + + // 16 TOPIC + byte topicLen = byteBuffer.get(); + byte[] topic = new byte[(int) topicLen]; + byteBuffer.get(topic); + msgExt.setTopic(new String(topic, CHARSET_UTF8)); + + // 17 properties + short propertiesLength = byteBuffer.getShort(); + if (propertiesLength > 0) { + byte[] properties = new byte[propertiesLength]; + byteBuffer.get(properties); + String propertiesString = new String(properties, CHARSET_UTF8); + Map<String, String> map = string2messageProperties(propertiesString); + msgExt.setProperties(map); + } + + ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH); + String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); + msgExt.setMsgId(msgId); + + if (isClient) { + ((MessageClientExt) msgExt).setOffsetMsgId(msgId); + } + + return msgExt; + } catch (UnknownHostException e) { + byteBuffer.position(byteBuffer.limit()); + } catch (BufferUnderflowException e) { + byteBuffer.position(byteBuffer.limit()); + } catch (Exception e) { + byteBuffer.position(byteBuffer.limit()); + } + + return null; + } + + + public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) { + return decodes(byteBuffer, true); + } + + public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + List<MessageExt> msgExts = new ArrayList<MessageExt>(); + while (byteBuffer.hasRemaining()) { + MessageExt msgExt = clientDecode(byteBuffer, readBody); + if (null != msgExt) { + msgExts.add(msgExt); + } else { + break; + } + } + return msgExts; + } + + public static final char NAME_VALUE_SEPARATOR = 1; + public static final char PROPERTY_SEPARATOR = 2; + + + public static String messageProperties2String(Map<String, String> properties) { + StringBuilder sb = new StringBuilder(); + if (properties != null) { + for (final Map.Entry<String, String> entry : properties.entrySet()) { + final String name = entry.getKey(); + final String value = entry.getValue(); + + sb.append(name); + sb.append(NAME_VALUE_SEPARATOR); + sb.append(value); + sb.append(PROPERTY_SEPARATOR); + } + } + return sb.toString(); + } + + public static Map<String, String> string2messageProperties(final String properties) { + Map<String, String> map = new HashMap<String, String>(); + if (properties != null) { + String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); + if (items != null) { + for (String i : items) { + String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); + if (nv != null && 2 == nv.length) { + map.put(nv[0], nv[1]); + } + } + } + } + + return map; + } +}
