This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/pop_consumer by this push:
new 86bea21 [RIP-19] Pop Consuming (common)
new 60ad9ab Merge pull request #2721 from ayanamist/pop_consumer
86bea21 is described below
commit 86bea21b4dc701e6cba5977b09e6f4e874494d19
Author: ayanamist <[email protected]>
AuthorDate: Tue Mar 9 11:04:46 2021 +0800
[RIP-19] Pop Consuming (common)
---
.../org/apache/rocketmq/common/BrokerConfig.java | 154 ++++++++++++
.../org/apache/rocketmq/common/KeyBuilder.java | 37 +++
.../apache/rocketmq/common/PopAckConstants.java | 35 +++
.../rocketmq/common/constant/ConsumeInitMode.java | 22 ++
.../rocketmq/common/constant/LoggerName.java | 1 +
.../rocketmq/common/message/MessageConst.java | 4 +
.../rocketmq/common/message/MessageDecoder.java | 2 +-
.../common/message/MessageQueueAssignment.java | 83 +++++++
.../common/message/MessageRequestMode.java | 43 ++++
.../rocketmq/common/protocol/RequestCode.java | 7 +
.../rocketmq/common/protocol/ResponseCode.java | 3 +
.../common/protocol/body/ConsumerRunningInfo.java | 32 +++
.../common/protocol/body/PopProcessQueueInfo.java | 59 +++++
.../protocol/body/QueryAssignmentRequestBody.java | 74 ++++++
.../protocol/body/QueryAssignmentResponseBody.java | 36 +++
.../body/SetMessageRequestModeRequestBody.java | 70 ++++++
.../protocol/header/AckMessageRequestHeader.java | 85 +++++++
.../header/ChangeInvisibleTimeRequestHeader.java | 97 ++++++++
.../header/ChangeInvisibleTimeResponseHeader.java | 61 +++++
.../common/protocol/header/ExtraInfoUtil.java | 258 +++++++++++++++++++++
.../protocol/header/PopMessageRequestHeader.java | 155 +++++++++++++
.../protocol/header/PopMessageResponseHeader.java | 102 ++++++++
.../rocketmq/common/utils/DataConverter.java | 42 ++++
23 files changed, 1461 insertions(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index d80b3d2..488f213 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -62,12 +63,14 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 +
Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 +
Runtime.getRuntime().availableProcessors() * 2;
+ private int ackMessageThreadPoolNums = 3;
private int processReplyMessageThreadPoolNums = 16 +
Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 +
Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
+ private int loadBalanceProcessorThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32,
Runtime.getRuntime().availableProcessors());
/**
@@ -85,6 +88,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
+ private int ackThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
@@ -158,6 +162,37 @@ public class BrokerConfig {
*/
private int registerNameServerPeriod = 1000 * 30;
+ private int popPollingSize = 1024;
+ private int popPollingMapSize = 100000;
+ // 20w cost 200M heap memory.
+ private long maxPopPollingSize = 100000;
+ private int reviveQueueNum = 8;
+ private long reviveInterval = 1000;
+ private long reviveMaxSlow = 3;
+ private long reviveScanTime = 10000;
+ private boolean enablePopLog = true;
+ private boolean enablePopBufferMerge = false;
+ private int popCkStayBufferTime = 10 * 1000;
+ private int popCkStayBufferTimeOut = 3 * 1000;
+ private int popCkMaxBufferSize = 200000;
+ private int popCkOffsetMaxQueueSize = 20000;
+
+ /**
+ * the interval of pulling topic information from the named server
+ */
+ private long loadBalancePollNameServerInterval = 1000 * 30;
+
+ /**
+ * the interval of cleaning
+ */
+ private int cleanOfflineBrokerInterval = 1000 * 30;
+
+ private boolean serverLoadBalancerEnabled = true;
+
+ private MessageRequestMode defaultMessageRequestMode =
MessageRequestMode.PULL;
+
+ private int defaultPopShareQueueNum = -1;
+
/**
* The minimum time of the transactional message to be checked firstly,
one message only exceed this time interval
* that can be checked.
@@ -197,6 +232,58 @@ public class BrokerConfig {
return "DEFAULT_BROKER";
}
+ public long getMaxPopPollingSize() {
+ return maxPopPollingSize;
+ }
+
+ public int getReviveQueueNum() {
+ return reviveQueueNum;
+ }
+
+ public long getReviveInterval() {
+ return reviveInterval;
+ }
+
+ public int getPopCkStayBufferTime() {
+ return popCkStayBufferTime;
+ }
+
+ public int getPopCkStayBufferTimeOut() {
+ return popCkStayBufferTimeOut;
+ }
+
+ public int getPopPollingMapSize() {
+ return popPollingMapSize;
+ }
+
+ public long getReviveScanTime() {
+ return reviveScanTime;
+ }
+
+ public long getReviveMaxSlow() {
+ return reviveMaxSlow;
+ }
+
+ public int getPopPollingSize() {
+ return popPollingSize;
+ }
+
+ public boolean isEnablePopBufferMerge() {
+ return enablePopBufferMerge;
+ }
+
+ public int getPopCkMaxBufferSize() {
+ return popCkMaxBufferSize;
+ }
+
+ public int getPopCkOffsetMaxQueueSize() {
+ return popCkOffsetMaxQueueSize;
+ }
+
+ public boolean isEnablePopLog() {
+ return enablePopLog;
+ }
+
public boolean isTraceOn() {
return traceOn;
}
@@ -381,6 +468,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
+ public int getAckMessageThreadPoolNums() {
+ return ackMessageThreadPoolNums;
+ }
+
+ public void setAckMessageThreadPoolNums(int ackMessageThreadPoolNums) {
+ this.ackMessageThreadPoolNums = ackMessageThreadPoolNums;
+ }
+
public int getProcessReplyMessageThreadPoolNums() {
return processReplyMessageThreadPoolNums;
}
@@ -485,6 +580,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
+ public int getAckThreadPoolQueueCapacity() {
+ return ackThreadPoolQueueCapacity;
+ }
+
+ public void setAckThreadPoolQueueCapacity(int ackThreadPoolQueueCapacity) {
+ this.ackThreadPoolQueueCapacity = ackThreadPoolQueueCapacity;
+ }
+
public int getReplyThreadPoolQueueCapacity() {
return replyThreadPoolQueueCapacity;
}
@@ -804,4 +907,55 @@ public class BrokerConfig {
public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
this.autoDeleteUnusedStats = autoDeleteUnusedStats;
}
+
+
+ public long getLoadBalancePollNameServerInterval() {
+ return loadBalancePollNameServerInterval;
+ }
+
+ public void setLoadBalancePollNameServerInterval(long
loadBalancePollNameServerInterval) {
+ this.loadBalancePollNameServerInterval =
loadBalancePollNameServerInterval;
+ }
+
+ public int getCleanOfflineBrokerInterval() {
+ return cleanOfflineBrokerInterval;
+ }
+
+ public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) {
+ this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval;
+ }
+
+ public int getLoadBalanceProcessorThreadPoolNums() {
+ return loadBalanceProcessorThreadPoolNums;
+ }
+
+ public void setLoadBalanceProcessorThreadPoolNums(int
loadBalanceProcessorThreadPoolNums) {
+ this.loadBalanceProcessorThreadPoolNums =
loadBalanceProcessorThreadPoolNums;
+ }
+
+ public boolean isServerLoadBalancerEnabled() {
+ return serverLoadBalancerEnabled;
+ }
+
+ public void setServerLoadBalancerEnabled(boolean
serverLoadBalancerEnabled) {
+ this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
+ }
+
+ public MessageRequestMode getDefaultMessageRequestMode() {
+ return defaultMessageRequestMode;
+ }
+
+ public void setDefaultMessageRequestMode(String defaultMessageRequestMode)
{
+ this.defaultMessageRequestMode =
MessageRequestMode.valueOf(defaultMessageRequestMode);
+ }
+
+
+ public int getDefaultPopShareQueueNum() {
+ return defaultPopShareQueueNum;
+ }
+
+
+ public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) {
+ this.defaultPopShareQueueNum = defaultPopShareQueueNum;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
new file mode 100644
index 0000000..d30789f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class KeyBuilder {
+ public static final int POP_ORDER_REVIVE_QUEUE = 999;
+
+ public static String buildPopRetryTopic(String topic, String cid) {
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
+ }
+
+ public static String parseNormalTopic(String topic, String cid) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid +
"_").length());
+ } else {
+ return topic;
+ }
+ }
+
+ public static String buildPollingKey(String topic, String cid, int
queueId) {
+ return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT +
queueId;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
new file mode 100644
index 0000000..839f947
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.topic.TopicValidator;
+
+public class PopAckConstants {
+ public static long ackTimeInterval = 1000;
+ public static final long SECOND = 1000;
+
+ public static long lockTime = 5000;
+ public static int retryQueueNum = 1;
+
+ public static final String REVIVE_GROUP = MixAll.CID_RMQ_SYS_PREFIX +
"REVIVE_GROUP";
+ public static final String LOCAL_HOST = "127.0.0.1";
+ public static final String REVIVE_TOPIC =
TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
+ public static final String CK_TAG = "ck";
+ public static final String ACK_TAG = "ack";
+ public static final String SPLIT = "@";
+
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java
b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java
new file mode 100644
index 0000000..b7091fa
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+public class ConsumeInitMode {
+ public static final int MIN = 0;
+ public static final int MAX = 1;
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f..589200b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+ public static final String ROCKETMQ_POP_LOGGER_NAME = "RocketmqPop";
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 5bdc846..0922c5f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -52,6 +52,8 @@ public class MessageConst {
public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
public static final String PROPERTY_CLUSTER = "CLUSTER";
public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+ public static final String PROPERTY_POP_CK = "POP_CK";
+ public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
public static final String KEY_SEPARATOR = " ";
@@ -80,6 +82,8 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
+ STRING_HASH_SET.add(PROPERTY_POP_CK);
+ STRING_HASH_SET.add(PROPERTY_FIRST_POP_TIME);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 7e86d84..77d3034 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -37,7 +37,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
public final static int MESSAGE_FLAG_POSTION = 16;
public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
- // public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
+ public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56;
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java
new file mode 100644
index 0000000..fcd9f58
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class MessageQueueAssignment implements Serializable {
+
+ private static final long serialVersionUID = 8092600270527861645L;
+
+ private MessageQueue messageQueue;
+
+ private MessageRequestMode mode = MessageRequestMode.PULL;
+
+ private Map<String, String> attachments;
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((messageQueue == null) ? 0 :
messageQueue.hashCode());
+ result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+ result = prime * result + ((attachments == null) ? 0 :
attachments.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MessageQueueAssignment other = (MessageQueueAssignment) obj;
+ return messageQueue.equals(other.messageQueue);
+ }
+
+ @Override
+ public String toString() {
+ return "MessageQueueAssignment [MessageQueue=" + messageQueue + ",
Mode=" + mode + "]";
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ public MessageRequestMode getMode() {
+ return mode;
+ }
+
+ public void setMode(MessageRequestMode mode) {
+ this.mode = mode;
+ }
+
+ public Map<String, String> getAttachments() {
+ return attachments;
+ }
+
+ public void setAttachments(Map<String, String> attachments) {
+ this.attachments = attachments;
+ }
+
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java
new file mode 100644
index 0000000..35a166a
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+/**
+ * Message Request Mode
+ */
+public enum MessageRequestMode {
+
+ /**
+ * pull
+ */
+ PULL("PULL"),
+
+ /**
+ * pop, consumer working in pop mode could share MessageQueue
+ */
+ POP("POP");
+
+ private String name;
+
+ MessageRequestMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff3..9446caa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -80,6 +80,10 @@ public class RequestCode {
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
+ public static final int POP_MESSAGE = 200050;
+ public static final int ACK_MESSAGE = 200051;
+ public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
+
public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101;
@@ -188,4 +192,7 @@ public class RequestCode {
public static final int SEND_REPLY_MESSAGE_V2 = 325;
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
+
+ public static final int QUERY_ASSIGNMENT = 400;
+ public static final int SET_MESSAGE_REQUEST_MODE = 401;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc74444..df0ccbe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,7 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+ public static final int POLLING_FULL = 209;
+
+ public static final int POLLING_TIMEOUT = 210;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index d7942eb..10d6f4d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -41,6 +41,8 @@ public class ConsumerRunningInfo extends RemotingSerializable
{
private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new
TreeMap<MessageQueue, ProcessQueueInfo>();
+ private TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable = new
TreeMap<MessageQueue, PopProcessQueueInfo>();
+
private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new
TreeMap<String, ConsumeStatus>();
private String jstack;
@@ -266,6 +268,28 @@ public class ConsumerRunningInfo extends
RemotingSerializable {
}
{
+ sb.append("\n\n#Consumer Pop Detail#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n",
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#ProcessQueueInfo"
+ ));
+
+ Iterator<Entry<MessageQueue, PopProcessQueueInfo>> it =
this.mqPopTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, PopProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %s%n",
+ next.getKey().getTopic(),
+ next.getKey().getBrokerName(),
+ next.getKey().getQueueId(),
+ next.getValue().toString());
+
+ sb.append(item);
+ }
+ }
+
+ {
sb.append("\n\n#Consumer RT&TPS#\n");
sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n",
"#Topic",
@@ -310,4 +334,12 @@ public class ConsumerRunningInfo extends
RemotingSerializable {
this.jstack = jstack;
}
+ public TreeMap<MessageQueue, PopProcessQueueInfo> getMqPopTable() {
+ return mqPopTable;
+ }
+
+ public void setMqPopTable(
+ TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable) {
+ this.mqPopTable = mqPopTable;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java
new file mode 100644
index 0000000..b8811bb
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+public class PopProcessQueueInfo {
+ private int waitAckCount;
+ private boolean droped;
+ private long lastPopTimestamp;
+
+
+ public int getWaitAckCount() {
+ return waitAckCount;
+ }
+
+
+ public void setWaitAckCount(int waitAckCount) {
+ this.waitAckCount = waitAckCount;
+ }
+
+
+ public boolean isDroped() {
+ return droped;
+ }
+
+
+ public void setDroped(boolean droped) {
+ this.droped = droped;
+ }
+
+
+ public long getLastPopTimestamp() {
+ return lastPopTimestamp;
+ }
+
+
+ public void setLastPopTimestamp(long lastPopTimestamp) {
+ this.lastPopTimestamp = lastPopTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "PopProcessQueueInfo [waitAckCount:" + waitAckCount +
+ ", droped:" + droped + ", lastPopTimestamp:" +
lastPopTimestamp + "]";
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java
new file mode 100644
index 0000000..6d0285b
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class QueryAssignmentRequestBody extends RemotingSerializable {
+
+ private String topic;
+
+ private String consumerGroup;
+
+ private String clientId;
+
+ private String strategyName;
+
+ private MessageModel messageModel;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getStrategyName() {
+ return strategyName;
+ }
+
+ public void setStrategyName(String strategyName) {
+ this.strategyName = strategyName;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java
new file mode 100644
index 0000000..688737d
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class QueryAssignmentResponseBody extends RemotingSerializable {
+
+ private Set<MessageQueueAssignment> messageQueueAssignments;
+
+ public Set<MessageQueueAssignment> getMessageQueueAssignments() {
+ return messageQueueAssignments;
+ }
+
+ public void setMessageQueueAssignments(
+ Set<MessageQueueAssignment> messageQueueAssignments) {
+ this.messageQueueAssignments = messageQueueAssignments;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java
new file mode 100644
index 0000000..309f7ae
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class SetMessageRequestModeRequestBody extends RemotingSerializable {
+
+ private String topic;
+
+ private String consumerGroup;
+
+ private MessageRequestMode mode = MessageRequestMode.PULL;
+
+ /*
+ consumer working in pop mode could share the MessageQueues assigned to the
N (N = popShareQueueNum) consumers following it in the cid list
+ */
+ private int popShareQueueNum = 0;
+
+ public SetMessageRequestModeRequestBody() {
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public MessageRequestMode getMode() {
+ return mode;
+ }
+
+ public void setMode(MessageRequestMode mode) {
+ this.mode = mode;
+ }
+
+ public int getPopShareQueueNum() {
+ return popShareQueueNum;
+ }
+
+ public void setPopShareQueueNum(int popShareQueueNum) {
+ this.popShareQueueNum = popShareQueueNum;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
new file mode 100644
index 0000000..02e388b
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AckMessageRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private Integer queueId;
+ @CFNotNull
+ private String extraInfo;
+
+ @CFNotNull
+ private Long offset;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public void setOffset(Long offset) {
+ this.offset = offset;
+ }
+
+ public Long getOffset() {
+ return offset;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setExtraInfo(String extraInfo) {
+ this.extraInfo = extraInfo;
+ }
+
+ public String getExtraInfo() {
+ return extraInfo;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+ @Override
+ public String toString() {
+ return topic + "," + this.consumerGroup + "," + this.queueId + "," +
this.offset + "," + this.extraInfo;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
new file mode 100644
index 0000000..a586e49
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ChangeInvisibleTimeRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private Integer queueId;
+ /**
+ * startOffset popTime invisibleTime queueId
+ */
+ @CFNotNull
+ private String extraInfo;
+
+ @CFNotNull
+ private Long offset;
+
+ @CFNotNull
+ private Long invisibleTime;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public void setOffset(Long offset) {
+ this.offset = offset;
+ }
+
+ public Long getOffset() {
+ return offset;
+ }
+
+ public Long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public void setInvisibleTime(Long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setExtraInfo(String extraInfo) {
+ this.extraInfo = extraInfo;
+ }
+
+ /**
+ * startOffset popTime invisibleTime queueId
+ */
+ public String getExtraInfo() {
+ return extraInfo;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java
new file mode 100644
index 0000000..2ebabb7
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ChangeInvisibleTimeResponseHeader implements CommandCustomHeader {
+
+
+ @CFNotNull
+ private long popTime;
+ @CFNotNull
+ private long invisibleTime;
+
+ @CFNotNull
+ private int reviveQid;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public int getReviveQid() {
+ return reviveQid;
+ }
+
+ public void setReviveQid(int reviveQid) {
+ this.reviveQid = reviveQid;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
new file mode 100644
index 0000000..19f37f6
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class ExtraInfoUtil {
+ private static final String NORMAL_TOPIC = "0";
+ private static final String RETRY_TOPIC = "1";
+
+ public static String[] split(String extraInfo) {
+ if (extraInfo == null) {
+ throw new IllegalArgumentException("split extraInfo is null");
+ }
+ return extraInfo.split(MessageConst.KEY_SEPARATOR);
+ }
+
+ public static Long getCkQueueOffset(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 1) {
+ throw new IllegalArgumentException("getCkQueueOffset fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Long.valueOf(extraInfoStrs[0]);
+ }
+
+ public static Long getPopTime(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 2) {
+ throw new IllegalArgumentException("getPopTime fail, extraInfoStrs
length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Long.valueOf(extraInfoStrs[1]);
+ }
+
+ public static Long getInvisibleTime(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 3) {
+ throw new IllegalArgumentException("getInvisibleTime fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Long.valueOf(extraInfoStrs[2]);
+ }
+
+ public static int getReviveQid(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 4) {
+ throw new IllegalArgumentException("getReviveQid fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Integer.valueOf(extraInfoStrs[3]);
+ }
+
+ public static String getRealTopic(String[] extraInfoStrs, String topic,
String cid) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 5) {
+ throw new IllegalArgumentException("getRealTopic fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ if (RETRY_TOPIC.equals(extraInfoStrs[4])) {
+ return KeyBuilder.buildPopRetryTopic(topic, cid);
+ } else {
+ return topic;
+ }
+ }
+
+ public static String getBrokerName(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 6) {
+ throw new IllegalArgumentException("getBrokerName fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return extraInfoStrs[5];
+ }
+
+ public static int getQueueId(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 7) {
+ throw new IllegalArgumentException("getQueueId fail, extraInfoStrs
length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Integer.valueOf(extraInfoStrs[6]);
+ }
+
+ public static long getQueueOffset(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 8) {
+ throw new IllegalArgumentException("getQueueOffset fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return Long.valueOf(extraInfoStrs[7]);
+ }
+
+ public static String buildExtraInfo(long ckQueueOffset, long popTime, long
invisibleTime, int reviveQid, String topic, String brokerName, int queueId) {
+ String t = NORMAL_TOPIC;
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ t = RETRY_TOPIC;
+ }
+ return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime +
MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR +
reviveQid + MessageConst.KEY_SEPARATOR + t
+ + MessageConst.KEY_SEPARATOR + brokerName +
MessageConst.KEY_SEPARATOR + queueId;
+ }
+
+ public static String buildExtraInfo(long ckQueueOffset, long popTime, long
invisibleTime, int reviveQid, String topic, String brokerName, int queueId,
+ long msgQueueOffset) {
+ String t = NORMAL_TOPIC;
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ t = RETRY_TOPIC;
+ }
+ return ckQueueOffset
+ + MessageConst.KEY_SEPARATOR + popTime +
MessageConst.KEY_SEPARATOR + invisibleTime
+ + MessageConst.KEY_SEPARATOR + reviveQid +
MessageConst.KEY_SEPARATOR + t
+ + MessageConst.KEY_SEPARATOR + brokerName +
MessageConst.KEY_SEPARATOR + queueId
+ + MessageConst.KEY_SEPARATOR + msgQueueOffset;
+ }
+
+ public static void buildStartOffsetInfo(StringBuilder stringBuilder,
boolean retry, int queueId, long startOffset) {
+ if (stringBuilder == null) {
+ stringBuilder = new StringBuilder(64);
+ }
+
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(";");
+ }
+
+ stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+ .append(MessageConst.KEY_SEPARATOR).append(queueId)
+ .append(MessageConst.KEY_SEPARATOR).append(startOffset);
+ }
+
+ public static void buildOrderCountInfo(StringBuilder stringBuilder,
boolean retry, int queueId, int orderCount) {
+ if (stringBuilder == null) {
+ stringBuilder = new StringBuilder(64);
+ }
+
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(";");
+ }
+
+ stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+ .append(MessageConst.KEY_SEPARATOR).append(queueId)
+ .append(MessageConst.KEY_SEPARATOR).append(orderCount);
+ }
+
+ public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean
retry, int queueId, List<Long> msgOffsets) {
+ if (stringBuilder == null) {
+ stringBuilder = new StringBuilder(64);
+ }
+
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(";");
+ }
+
+ stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+ .append(MessageConst.KEY_SEPARATOR).append(queueId)
+ .append(MessageConst.KEY_SEPARATOR);
+
+ for (int i = 0; i < msgOffsets.size(); i++) {
+ stringBuilder.append(msgOffsets.get(i));
+ if (i < msgOffsets.size() - 1) {
+ stringBuilder.append(",");
+ }
+ }
+ }
+
+ public static Map<String, List<Long>> parseMsgOffsetInfo(String
msgOffsetInfo) {
+ if (msgOffsetInfo == null || msgOffsetInfo.length() == 0) {
+ return null;
+ }
+
+ Map<String, List<Long>> msgOffsetMap = new HashMap<String,
List<Long>>(4);
+ String[] array;
+ if (msgOffsetInfo.indexOf(";") < 0) {
+ array = new String[]{msgOffsetInfo};
+ } else {
+ array = msgOffsetInfo.split(";");
+ }
+
+ for (String one : array) {
+ String[] split = one.split(MessageConst.KEY_SEPARATOR);
+ if (split.length != 3) {
+ throw new IllegalArgumentException("parse msgOffsetMap error,
" + msgOffsetMap);
+ }
+ String key = split[0] + "@" + split[1];
+ if (msgOffsetMap.containsKey(key)) {
+ throw new IllegalArgumentException("parse msgOffsetMap error,
duplicate, " + msgOffsetMap);
+ }
+ msgOffsetMap.put(key, new ArrayList<Long>(8));
+ String[] msgOffsets = split[2].split(",");
+ for (String msgOffset : msgOffsets) {
+ msgOffsetMap.get(key).add(Long.valueOf(msgOffset));
+ }
+ }
+
+ return msgOffsetMap;
+ }
+
+ public static Map<String, Long> parseStartOffsetInfo(String
startOffsetInfo) {
+ if (startOffsetInfo == null || startOffsetInfo.length() == 0) {
+ return null;
+ }
+ Map<String, Long> startOffsetMap = new HashMap<String, Long>(4);
+ String[] array;
+ if (startOffsetInfo.indexOf(";") < 0) {
+ array = new String[]{startOffsetInfo};
+ } else {
+ array = startOffsetInfo.split(";");
+ }
+
+ for (String one : array) {
+ String[] split = one.split(MessageConst.KEY_SEPARATOR);
+ if (split.length != 3) {
+ throw new IllegalArgumentException("parse startOffsetInfo
error, " + startOffsetInfo);
+ }
+ String key = split[0] + "@" + split[1];
+ if (startOffsetMap.containsKey(key)) {
+ throw new IllegalArgumentException("parse startOffsetInfo
error, duplicate, " + startOffsetInfo);
+ }
+ startOffsetMap.put(key, Long.valueOf(split[2]));
+ }
+
+ return startOffsetMap;
+ }
+
+ public static Map<String, Integer> parseOrderCountInfo(String
orderCountInfo) {
+ if (orderCountInfo == null || orderCountInfo.length() == 0) {
+ return null;
+ }
+ Map<String, Integer> startOffsetMap = new HashMap<String, Integer>(4);
+ String[] array;
+ if (orderCountInfo.indexOf(";") < 0) {
+ array = new String[]{orderCountInfo};
+ } else {
+ array = orderCountInfo.split(";");
+ }
+
+ for (String one : array) {
+ String[] split = one.split(MessageConst.KEY_SEPARATOR);
+ if (split.length != 3) {
+ throw new IllegalArgumentException("parse orderCountInfo
error, " + orderCountInfo);
+ }
+ String key = split[0] + "@" + split[1];
+ if (startOffsetMap.containsKey(key)) {
+ throw new IllegalArgumentException("parse orderCountInfo
error, duplicate, " + orderCountInfo);
+ }
+ startOffsetMap.put(key, Integer.valueOf(split[2]));
+ }
+
+ return startOffsetMap;
+ }
+
+ public static String getStartOffsetInfoMapKey(String topic, int queueId) {
+ return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ?
RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
new file mode 100644
index 0000000..4d151a2
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class PopMessageRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private int queueId;
+ @CFNotNull
+ private int maxMsgNums;
+ @CFNotNull
+ private long invisibleTime;
+ @CFNotNull
+ private long pollTime;
+ @CFNotNull
+ private long bornTime;
+ @CFNotNull
+ private int initMode;
+
+ private String expType;
+ private String exp;
+
+ /**
+ * marked as order consume, if true
+ * 1. not commit offset
+ * 2. not pop retry, because no retry
+ * 3. not append check point, because no retry
+ */
+ private Boolean order = Boolean.FALSE;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public void setInitMode(int initMode) {
+ this.initMode = initMode;
+ }
+
+ public int getInitMode() {
+ return initMode;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public long getPollTime() {
+ return pollTime;
+ }
+
+ public void setPollTime(long pollTime) {
+ this.pollTime = pollTime;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public long getBornTime() {
+ return bornTime;
+ }
+
+ public void setBornTime(long bornTime) {
+ this.bornTime = bornTime;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getQueueId() {
+ if (queueId < 0) {
+ return -1;
+ }
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public int getMaxMsgNums() {
+ return maxMsgNums;
+ }
+
+ public void setMaxMsgNums(int maxMsgNums) {
+ this.maxMsgNums = maxMsgNums;
+ }
+
+ public boolean isTimeoutTooMuch() {
+ return System.currentTimeMillis() - bornTime - pollTime > 500;
+ }
+
+ public String getExpType() {
+ return expType;
+ }
+
+ public void setExpType(String expType) {
+ this.expType = expType;
+ }
+
+ public String getExp() {
+ return exp;
+ }
+
+ public void setExp(String exp) {
+ this.exp = exp;
+ }
+
+ public Boolean getOrder() {
+ return order;
+ }
+
+ public void setOrder(Boolean order) {
+ this.order = order;
+ }
+
+ public boolean isOrder() {
+ return this.order != null && this.order.booleanValue();
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java
new file mode 100644
index 0000000..09867f3
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class PopMessageResponseHeader implements CommandCustomHeader {
+
+
+ @CFNotNull
+ private long popTime;
+ @CFNotNull
+ private long invisibleTime;
+
+ @CFNotNull
+ private int reviveQid;
+ /**
+ * the rest num in queue
+ */
+ @CFNotNull
+ private long restNum;
+
+ private String startOffsetInfo;
+ private String msgOffsetInfo;
+ private String orderCountInfo;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public long getRestNum() {
+ return restNum;
+ }
+
+ public void setRestNum(long restNum) {
+ this.restNum = restNum;
+ }
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public int getReviveQid() {
+ return reviveQid;
+ }
+
+ public void setReviveQid(int reviveQid) {
+ this.reviveQid = reviveQid;
+ }
+
+ public String getStartOffsetInfo() {
+ return startOffsetInfo;
+ }
+
+ public void setStartOffsetInfo(String startOffsetInfo) {
+ this.startOffsetInfo = startOffsetInfo;
+ }
+
+ public String getMsgOffsetInfo() {
+ return msgOffsetInfo;
+ }
+
+ public void setMsgOffsetInfo(String msgOffsetInfo) {
+ this.msgOffsetInfo = msgOffsetInfo;
+ }
+
+ public String getOrderCountInfo() {
+ return orderCountInfo;
+ }
+
+ public void setOrderCountInfo(String orderCountInfo) {
+ this.orderCountInfo = orderCountInfo;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
new file mode 100644
index 0000000..8b50de1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+public class DataConverter {
+ public static Charset charset = Charset.forName("UTF-8");
+
+ public static byte[] Long2Byte(Long v) {
+ ByteBuffer tmp = ByteBuffer.allocate(8);
+ tmp.putLong(v);
+ return tmp.array();
+ }
+
+ public static int setBit(int value, int index, boolean flag) {
+ if (flag) {
+ return (int) (value | (1L << index));
+ } else {
+ return (int) (value & ~(1L << index));
+ }
+ }
+
+ public static boolean getBit(int value, int index) {
+ return (value & (1L << index)) != 0;
+ }
+}