This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/litePullConsumer by this push:
new 83098eb Polish lite pull consumer (#1349)
83098eb is described below
commit 83098eb604007eca7892565df7c213ee19a342f5
Author: King <[email protected]>
AuthorDate: Tue Jul 30 10:59:13 2019 +0800
Polish lite pull consumer (#1349)
* fix unsubscribe code
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* polish commit consumed offset
* pass checkstyle
* pass checkstyle
* polish LiteMQPullConsumer
* add flow control and polish commit logic
* fix bug
* polish code
* fix commit consumed offset back
* refactor litePullConsumer
* development save
* development save
* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
* Polish lite pull consumer
* polish lite pull consumer
* polish lite pull consumer
* fix seek
---
.../client/consumer/DefaultLiteMQPullConsumer.java | 132 ---
.../client/consumer/DefaultLitePullConsumer.java | 396 ++++++++
...teMQPullConsumer.java => LitePullConsumer.java} | 32 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 1 -
.../client/impl/consumer/AssignedMessageQueue.java | 109 +-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 1069 ++++++++++++++++++++
.../impl/consumer/DefaultMQPullConsumerImpl.java | 4 +-
.../impl/consumer/LiteMQPullConsumerImpl.java | 469 ---------
.../client/impl/consumer/ProcessQueue.java | 10 -
.../impl/consumer/RebalanceLitePullImpl.java | 68 ++
.../example/simple/LitePullConsumerTest.java | 24 +-
11 files changed, 1661 insertions(+), 653 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
deleted file mode 100644
index 6f67bcf..0000000
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.consumer;
-
-import java.util.Collection;
-import java.util.List;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer
implements LiteMQPullConsumer {
- private LiteMQPullConsumerImpl liteMQPullConsumer;
-
- /**
- * Maximum amount of time in minutes a message may block the consuming
thread.
- */
- private long consumeTimeout = 15;
-
- /**
- * Is auto commit offset
- */
- private boolean autoCommit = true;
-
- private int pullThreadNumbers = 20;
-
- /**
- * Maximum commit offset interval time in seconds.
- */
- private long autoCommitInterval = 5;
-
- public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
- this.setConsumerGroup(consumerGroup);
- this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
- }
-
- public DefaultLiteMQPullConsumer(String consumerGroup) {
- this.setConsumerGroup(consumerGroup);
- this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
- }
-
- @Override
- public void start() throws MQClientException {
- this.liteMQPullConsumer.start();
- }
-
- @Override
- public void subscribe(String topic, String subExpression) throws
MQClientException {
- this.liteMQPullConsumer.subscribe(topic, subExpression);
- }
-
- @Override
- public void unsubscribe(String topic) {
- this.liteMQPullConsumer.unsubscribe(topic);
- }
-
- @Override
- public List<MessageExt> poll() {
- return poll(this.getConsumerPullTimeoutMillis());
- }
-
- @Override public List<MessageExt> poll(long timeout) {
- return liteMQPullConsumer.poll(timeout);
- }
-
- @Override
- public void seek(MessageQueue messageQueue, long offset) throws
MQClientException {
- this.liteMQPullConsumer.seek(messageQueue, offset);
- }
-
- @Override
- public void pause(Collection<MessageQueue> messageQueues) {
- this.liteMQPullConsumer.pause(messageQueues);
- }
-
- @Override
- public void resume(Collection<MessageQueue> messageQueues) {
- this.liteMQPullConsumer.resume(messageQueues);
- }
-
- @Override
- public void commitSync() {
- this.liteMQPullConsumer.commitSync();
- }
-
- public long getConsumeTimeout() {
- return consumeTimeout;
- }
-
- public void setConsumeTimeout(long consumeTimeout) {
- this.consumeTimeout = consumeTimeout;
- }
-
- public boolean isAutoCommit() {
- return autoCommit;
- }
-
- public void setAutoCommit(boolean autoCommit) {
- this.autoCommit = autoCommit;
- }
-
- public int getPullThreadNumbers() {
- return pullThreadNumbers;
- }
-
- public void setPullThreadNumbers(int pullThreadNumbers) {
- this.pullThreadNumbers = pullThreadNumbers;
- }
-
- public long getAutoCommitInterval() {
- return autoCommitInterval;
- }
-
- public void setAutoCommitInterval(long autoCommitInterval) {
- this.autoCommitInterval = autoCommitInterval;
- }
-}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
new file mode 100644
index 0000000..757c966
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rocketmq.client.ClientConfig;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
+
+
+public class DefaultLitePullConsumer extends ClientConfig implements
LitePullConsumer {
+
+ private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+
+ /**
+ * Do the same thing for the same Group, the application must be set,and
guarantee Globally unique
+ */
+ private String consumerGroup;
+
+ /**
+ * Long polling mode, the Consumer connection max suspend time, it is not
recommended to modify
+ */
+ private long brokerSuspendMaxTimeMillis = 1000 * 20;
+
+
+ /**
+ * Long polling mode, the Consumer connection timeout(must greater than
brokerSuspendMaxTimeMillis), it is not
+ * recommended to modify
+ */
+ private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+
+ /**
+ * The socket timeout in milliseconds
+ */
+ private long consumerPullTimeoutMillis = 1000 * 10;
+
+ /**
+ * Consumption pattern,default is clustering
+ */
+ private MessageModel messageModel = MessageModel.CLUSTERING;
+ /**
+ * Message queue listener
+ */
+ private MessageQueueListener messageQueueListener;
+ /**
+ * Offset Storage
+ */
+ private OffsetStore offsetStore;
+ /**
+ * Topic set you want to register
+ */
+ private Set<String> registerTopics = new HashSet<String>();
+ /**
+ * Queue allocation algorithm
+ */
+ private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new
AllocateMessageQueueAveragely();
+ /**
+ * Whether the unit of subscription group
+ */
+ private boolean unitMode = false;
+
+ private int maxReconsumeTimes = 16;
+ /**
+ * Maximum amount of time in minutes a message may block the consuming
thread.
+ */
+ private long consumeTimeout = 15;
+
+ /**
+ * Is auto commit offset
+ */
+ private boolean autoCommit = true;
+
+ private int pullThreadNumbers = 20;
+
+ /**
+ * Maximum commit offset interval time in seconds.
+ */
+ private long autoCommitInterval = 5;
+
+ /**
+ * Maximum number of messages pulled each time.
+ */
+ private int pullBatchNums = 10;
+
+ /**
+ * Flow control threshold for consume request, each consumer will cache at
most 10000 consume requests by default.
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed
the limit
+ */
+ private long pullThresholdForAll = 10000;
+
+ /**
+ * Consume max span offset.
+ */
+ private int consumeMaxSpan = 2000;
+
+ /**
+ * Flow control threshold on queue level, each message queue will cache at
most 1000 messages by default, Consider
+ * the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ */
+ private int pullThresholdForQueue = 1000;
+
+ /**
+ * Limit the cached message size on queue level, each message queue will
cache at most 100 MiB messages by default,
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed
the limit
+ *
+ * <p>
+ * The size of a message only measured by message body, so it's not
accurate
+ */
+ private int pullThresholdSizeForQueue = 100;
+
+ /**
+ * The socket timeout in milliseconds
+ */
+ private long pollTimeoutMillis = 1000 * 5;
+
+ public DefaultLitePullConsumer() {
+ this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
+ }
+
+ public DefaultLitePullConsumer(final String consumerGroup) {
+ this(null, consumerGroup, null);
+ }
+
+ public DefaultLitePullConsumer(RPCHook rpcHook) {
+ this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
+ }
+
+ public DefaultLitePullConsumer(final String consumerGroup, RPCHook
rpcHook) {
+ this(null, consumerGroup, rpcHook);
+ }
+
+ /**
+ * Constructor specifying namespace, consumer group and RPC hook.
+ *
+ * @param consumerGroup Consumer group.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
+ public DefaultLitePullConsumer(final String namespace, final String
consumerGroup, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.consumerGroup = consumerGroup;
+ defaultLitePullConsumerImpl = new
DefaultLitePullConsumerImpl(this,rpcHook);
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ this.defaultLitePullConsumerImpl.start();
+ }
+
+ @Override
+ public void shutdown() {
+ this.defaultLitePullConsumerImpl.shutdown();
+ }
+
+ @Override
+ public void subscribe(String topic, String subExpression) throws
MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(topic, subExpression);
+ }
+
+ @Override
+ public void unsubscribe(String topic) {
+ this.defaultLitePullConsumerImpl.unsubscribe(topic);
+ }
+
+ @Override
+ public void assign(Collection<MessageQueue> messageQueues) {
+ defaultLitePullConsumerImpl.assign(messageQueues);
+ }
+
+ @Override
+ public List<MessageExt> poll() {
+ return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
+ }
+
+ @Override
+ public List<MessageExt> poll(long timeout) {
+ return defaultLitePullConsumerImpl.poll(timeout);
+ }
+
+ @Override
+ public void seek(MessageQueue messageQueue, long offset) throws
MQClientException {
+ this.defaultLitePullConsumerImpl.seek(messageQueue, offset);
+ }
+
+ @Override
+ public void pause(Collection<MessageQueue> messageQueues) {
+ this.defaultLitePullConsumerImpl.pause(messageQueues);
+ }
+
+ @Override
+ public void resume(Collection<MessageQueue> messageQueues) {
+ this.defaultLitePullConsumerImpl.resume(messageQueues);
+ }
+
+ @Override
+ public Collection<MessageQueue> fetchMessageQueues(String topic) throws
MQClientException{
+ return
this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
+ }
+
+ @Override
+ public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp)
throws MQClientException{
+ return
this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
+ }
+
+ @Override
+ public void commitSync() {
+ this.defaultLitePullConsumerImpl.commitSync();
+ }
+
+ @Override
+ public boolean isAutoCommit() {
+ return autoCommit;
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) {
+ this.autoCommit = autoCommit;
+ }
+
+ public long getConsumeTimeout() {
+ return consumeTimeout;
+ }
+
+ public void setConsumeTimeout(long consumeTimeout) {
+ this.consumeTimeout = consumeTimeout;
+ }
+
+ public int getPullThreadNumbers() {
+ return pullThreadNumbers;
+ }
+
+ public void setPullThreadNumbers(int pullThreadNumbers) {
+ this.pullThreadNumbers = pullThreadNumbers;
+ }
+
+ public long getAutoCommitInterval() {
+ return autoCommitInterval;
+ }
+
+ public void setAutoCommitInterval(long autoCommitInterval) {
+ this.autoCommitInterval = autoCommitInterval;
+ }
+
+ public int getPullBatchNums() {
+ return pullBatchNums;
+ }
+
+ public void setPullBatchNums(int pullBatchNums) {
+ this.pullBatchNums = pullBatchNums;
+ }
+
+ public long getPullThresholdForAll() {
+ return pullThresholdForAll;
+ }
+
+ public void setPullThresholdForAll(long pullThresholdForAll) {
+ this.pullThresholdForAll = pullThresholdForAll;
+ }
+
+ public int getConsumeMaxSpan() {
+ return consumeMaxSpan;
+ }
+
+ public void setConsumeMaxSpan(int consumeMaxSpan) {
+ this.consumeMaxSpan = consumeMaxSpan;
+ }
+
+ public int getPullThresholdForQueue() {
+ return pullThresholdForQueue;
+ }
+
+ public void setPullThresholdForQueue(int pullThresholdForQueue) {
+ this.pullThresholdForQueue = pullThresholdForQueue;
+ }
+
+ public int getPullThresholdSizeForQueue() {
+ return pullThresholdSizeForQueue;
+ }
+
+ public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
+ this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+ }
+
+ public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+ return allocateMessageQueueStrategy;
+ }
+
+ public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy
allocateMessageQueueStrategy) {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ }
+
+ public long getBrokerSuspendMaxTimeMillis() {
+ return brokerSuspendMaxTimeMillis;
+ }
+
+ public long getPollTimeoutMillis() {
+ return pollTimeoutMillis;
+ }
+
+ public void setPollTimeoutMillis(long pollTimeoutMillis) {
+ this.pollTimeoutMillis = pollTimeoutMillis;
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public void setOffsetStore(OffsetStore offsetStore) {
+ this.offsetStore = offsetStore;
+ }
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+ public void setUnitMode(boolean isUnitMode) {
+ this.unitMode = isUnitMode;
+ }
+
+ public int getMaxReconsumeTimes() {
+ return maxReconsumeTimes;
+ }
+
+ public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+ this.maxReconsumeTimes = maxReconsumeTimes;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public MessageQueueListener getMessageQueueListener() {
+ return messageQueueListener;
+ }
+
+ public void setMessageQueueListener(MessageQueueListener
messageQueueListener) {
+ this.messageQueueListener = messageQueueListener;
+ }
+
+ public Set<String> getRegisterTopics() {
+ return registerTopics;
+ }
+
+ public void setRegisterTopics(Set<String> registerTopics) {
+ this.registerTopics = withNamespace(registerTopics);
+ }
+
+ public long getConsumerPullTimeoutMillis() {
+ return consumerPullTimeoutMillis;
+ }
+
+ public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
+ this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+ }
+
+ public long getConsumerTimeoutMillisWhenSuspend() {
+ return consumerTimeoutMillisWhenSuspend;
+ }
+
+ public void setConsumerTimeoutMillisWhenSuspend(long
consumerTimeoutMillisWhenSuspend) {
+ this.consumerTimeoutMillisWhenSuspend =
consumerTimeoutMillisWhenSuspend;
+ }
+
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
similarity index 70%
rename from
client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
rename to
client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index da8d1cf..ece08af 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -18,16 +18,28 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
+
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-public interface LiteMQPullConsumer {
+public interface LitePullConsumer {
+
+ /**
+ * Start the consumer
+ */
+ void start() throws MQClientException;
+
+ /**
+ * Shutdown the consumer
+ */
+ void shutdown();
+
/**
* Subscribe some topic
*
* @param subExpression subscription expression.it only support or
operation such as "tag1 || tag2 || tag3" <br> if
- * null or * expression,meaning subscribe all
+ * null or * expression,meaning subscribe all
*/
void subscribe(final String topic, final String subExpression) throws
MQClientException;
@@ -38,15 +50,27 @@ public interface LiteMQPullConsumer {
*/
void unsubscribe(final String topic);
+ void assign(Collection<MessageQueue> messageQueues);
+
List<MessageExt> poll();
List<MessageExt> poll(long timeout);
void seek(MessageQueue messageQueue, long offset) throws MQClientException;
- void pause(Collection<MessageQueue> messageQueueCollection);
+ void pause(Collection<MessageQueue> messageQueues);
+
+ boolean isAutoCommit();
+
+ void setAutoCommit(boolean autoCommit);
+
+ void resume(Collection<MessageQueue> messageQueues);
- void resume(Collection<MessageQueue> partitions);
+ Collection<MessageQueue> fetchMessageQueues(String topic) throws
MQClientException;
+
+ Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws
MQClientException;
void commitSync();
+
+
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index c1524e1..63dc525 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -204,7 +204,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore
{
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
-
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index fb0ca79..a3c5da1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -19,25 +19,33 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
private ConcurrentHashMap<MessageQueue, MessageQueueStat>
assignedMessageQueueState;
+ private RebalanceImpl rebalanceImpl;
+
public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue,
MessageQueueStat>();
}
+ public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
+ this.rebalanceImpl = rebalanceImpl;
+ }
+
+ public Collection<MessageQueue> messageQueues(){
+ return assignedMessageQueueState.keySet();
+ }
+
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
return messageQueueStat.isPaused();
}
- return false;
+ return true;
}
public void pause(Collection<MessageQueue> messageQueues) {
@@ -58,24 +66,60 @@ public class AssignedMessageQueue {
}
}
- public long getNextOffset(MessageQueue messageQueue) throws
MQClientException {
+ public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
- if (assignedMessageQueueState.get(messageQueue) != null) {
+ if (messageQueueStat != null) {
+ return messageQueueStat.getProcessQueue();
+ }
+ return null;
+ }
+
+ public long getNextOffset(MessageQueue messageQueue) {
+ MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
return messageQueueStat.getNextOffset();
}
return -1;
}
- public void updateNextOffset(MessageQueue messageQueue, long offset)
throws MQClientException {
+ public void updateNextOffset(MessageQueue messageQueue, long offset) {
+ MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
+ messageQueueStat.setNextOffset(offset);
+ }
+ }
+
+ public long getConusmerOffset(MessageQueue messageQueue) {
+ MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
+ return messageQueueStat.getConsumeOffset();
+ }
+ return -1;
+ }
+
+ public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
+ MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
+ messageQueueStat.setConsumeOffset(offset);
+ }
+ }
+
+ public void setSeekOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
- if (messageQueue == null) {
- messageQueueStat = new MessageQueueStat(messageQueue, offset);
- assignedMessageQueueState.putIfAbsent(messageQueue,
messageQueueStat);
+ if (messageQueueStat != null) {
+ messageQueueStat.setSeekOffset(offset);
}
- assignedMessageQueueState.get(messageQueue).setNextOffset(offset);
}
- public void updateAssignedMessageQueue(Set<MessageQueue> assigned) {
+ public long getSeekOffset(MessageQueue messageQueue) {
+ MessageQueueStat messageQueueStat =
assignedMessageQueueState.get(messageQueue);
+ if (messageQueueStat != null) {
+ return messageQueueStat.getSeekOffset();
+ }
+ return -1;
+ }
+
+ public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it =
this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
@@ -87,7 +131,13 @@ public class AssignedMessageQueue {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue))
{
- MessageQueueStat messageQueueStat = new
MessageQueueStat(messageQueue);
+ MessageQueueStat messageQueueStat;
+ if (rebalanceImpl != null &&
rebalanceImpl.processQueueTable.get(messageQueue) != null) {
+ messageQueueStat = new MessageQueueStat(messageQueue,
rebalanceImpl.processQueueTable.get(messageQueue));
+ } else {
+ ProcessQueue processQueue = new ProcessQueue();
+ messageQueueStat = new MessageQueueStat(messageQueue,
processQueue);
+ }
this.assignedMessageQueueState.put(messageQueue,
messageQueueStat);
}
}
@@ -108,16 +158,15 @@ public class AssignedMessageQueue {
public class MessageQueueStat {
private MessageQueue messageQueue;
+ private ProcessQueue processQueue;
private boolean paused = false;
private long nextOffset = -1;
+ private long consumeOffset = -1;
+ private volatile long seekOffset = -1;
- public MessageQueueStat(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
- public MessageQueueStat(MessageQueue messageQueue, long nextOffset) {
+ public MessageQueueStat(MessageQueue messageQueue, ProcessQueue
processQueue) {
this.messageQueue = messageQueue;
- this.nextOffset = nextOffset;
+ this.processQueue = processQueue;
}
public MessageQueue getMessageQueue() {
@@ -143,5 +192,29 @@ public class AssignedMessageQueue {
public void setNextOffset(long nextOffset) {
this.nextOffset = nextOffset;
}
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public void setProcessQueue(ProcessQueue processQueue) {
+ this.processQueue = processQueue;
+ }
+
+ public long getConsumeOffset() {
+ return consumeOffset;
+ }
+
+ public void setConsumeOffset(long consumeOffset) {
+ this.consumeOffset = consumeOffset;
+ }
+
+ public long getSeekOffset() {
+ return seekOffset;
+ }
+
+ public void setSeekOffset(long seekOffset) {
+ this.seekOffset = seekOffset;
+ }
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
new file mode 100644
index 0000000..95e218f
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+
+ private final InternalLogger log = ClientLogger.getLog();
+
+ private final long consumerStartTimestamp = System.currentTimeMillis();
+
+ private final RPCHook rpcHook;
+
+ private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new
ArrayList<ConsumeMessageHook>();
+
+ private final ArrayList<FilterMessageHook> filterMessageHookList = new
ArrayList<FilterMessageHook>();
+
+ private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
+
+ protected MQClientInstance mQClientFactory;
+
+ private PullAPIWrapper pullAPIWrapper;
+
+ private OffsetStore offsetStore;
+
+ private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
+
+ private enum SubscriptionType {
+ NONE, SUBSCRIBE, ASSIGN
+ }
+
+ private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer
not running.";
+
+ private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE =
"Cannot select two subscription types at the same time.";
+ /**
+ * the type of subscription
+ */
+ private SubscriptionType subscriptionType = SubscriptionType.NONE;
+ /**
+ * Delay some time when exception occur
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
+ /**
+ * Flow control interval
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+ /**
+ * Delay some time when suspend pull service
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+
+ private DefaultLitePullConsumer defaultLitePullConsumer;
+
+ private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
+ new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+
+ private AssignedMessageQueue assignedMessageQueue = new
AssignedMessageQueue();
+
+ private final BlockingQueue<ConsumeRequest> consumeRequestCache = new
LinkedBlockingQueue<ConsumeRequest>();
+
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+ private long consumeRequestFlowControlTimes = 0L;
+
+ private long queueFlowControlTimes = 0L;
+
+ private long queueMaxSpanFlowControlTimes = 0L;
+
+ private long nextAutoCommitDeadline = -1L;
+
+ public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer
defaultLitePullConsumer, final RPCHook rpcHook) {
+
+ this.defaultLitePullConsumer = defaultLitePullConsumer;
+ this.rpcHook = rpcHook;
+
+ }
+
+ private void checkServiceState() {
+ if (!(this.serviceState == ServiceState.RUNNING))
+ throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
+ }
+
+ private synchronized void setSubscriptionType(SubscriptionType type) {
+ if (this.subscriptionType == SubscriptionType.NONE)
+ this.subscriptionType = type;
+ else if (this.subscriptionType != type)
+ throw new
IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE);
+ }
+
+ private void updateAssignedMessageQueue(String topic, Set<MessageQueue>
assignedMessageQueue) {
+
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
+ updatePullTask(topic, assignedMessageQueue);
+ }
+
+ private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
+ Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it =
this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ if (!mqNewSet.contains(next.getKey())) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+ }
+ startPullTask(mqNewSet);
+ }
+
+ class MessageQueueListenerImpl implements MessageQueueListener {
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
+ MessageModel messageModel =
defaultLitePullConsumer.getMessageModel();
+ switch (messageModel) {
+ case BROADCASTING:
+ updateAssignedMessageQueue(topic, mqAll);
+ break;
+ case CLUSTERING:
+ updateAssignedMessageQueue(topic, mqDivided);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private int nextPullBatchNums() {
+ return Math.min(this.defaultLitePullConsumer.getPullBatchNums(),
consumeRequestCache.remainingCapacity());
+ }
+
+ public synchronized void shutdown() {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.persistConsumerOffset();
+
this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
+ this.mQClientFactory.shutdown();
+ log.info("the consumer [{}] shutdown OK",
this.defaultLitePullConsumer.getConsumerGroup());
+ scheduledThreadPoolExecutor.shutdown();
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ public synchronized void start() throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.checkConfig();
+
+ this.copySubscription();
+
+ if (this.defaultLitePullConsumer.getMessageModel() ==
MessageModel.CLUSTERING) {
+ this.defaultLitePullConsumer.changeInstanceNameToPID();
+ }
+
+ this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer,
this.rpcHook);
+
+
this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
+
this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
+
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
+ this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+
+ this.pullAPIWrapper = new PullAPIWrapper(
+ mQClientFactory,
+ this.defaultLitePullConsumer.getConsumerGroup(),
isUnitMode());
+
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+
+ if (this.defaultLitePullConsumer.getOffsetStore() != null) {
+ this.offsetStore =
this.defaultLitePullConsumer.getOffsetStore();
+ } else {
+ switch (this.defaultLitePullConsumer.getMessageModel()) {
+ case BROADCASTING:
+ this.offsetStore = new
LocalFileOffsetStore(this.mQClientFactory,
this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ case CLUSTERING:
+ this.offsetStore = new
RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+
this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
+ }
+
+ this.offsetStore.load();
+
+ boolean registerOK =
mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(),
this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+
+ throw new MQClientException("The consumer group[" +
this.defaultLitePullConsumer.getConsumerGroup()
+ + "] has been created before, specify another name
please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
+ }
+
+ mQClientFactory.start();
+
+ final String group =
this.defaultLitePullConsumer.getConsumerGroup();
+
+ this.scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(
+ this.defaultLitePullConsumer.getPullThreadNumbers(),
+ new ThreadFactoryImpl("PullMsgThread-" + group)
+ );
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ if (subscriptionType == SubscriptionType.ASSIGN) {
+ updateAssignPullTask(assignedMessageQueue.messageQueues());
+ }
+
+ log.info("the consumer [{}] start OK",
this.defaultLitePullConsumer.getConsumerGroup());
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException("The PullConsumer service state
not OK, maybe started once, "
+ + this.serviceState
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ default:
+ break;
+ }
+ }
+
+ private void checkConfig() throws MQClientException {
+ // check consumerGroup
+ Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
+
+ // consumerGroup
+ if (null == this.defaultLitePullConsumer.getConsumerGroup()) {
+ throw new MQClientException(
+ "consumerGroup is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // consumerGroup
+ if
(this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP))
{
+ throw new MQClientException(
+ "consumerGroup can not equal "
+ + MixAll.DEFAULT_CONSUMER_GROUP
+ + ", please specify another one."
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // messageModel
+ if (null == this.defaultLitePullConsumer.getMessageModel()) {
+ throw new MQClientException(
+ "messageModel is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // allocateMessageQueueStrategy
+ if (null ==
this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
+ throw new MQClientException(
+ "allocateMessageQueueStrategy is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // allocateMessageQueueStrategy
+ if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend()
< this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
+ throw new MQClientException(
+ "Long polling mode, the consumer
consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
+ private void copySubscription() throws MQClientException {
+ try {
+ Set<String> registerTopics =
this.defaultLitePullConsumer.getRegisterTopics();
+ if (registerTopics != null) {
+ for (final String topic : registerTopics) {
+ SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+ topic, SubscriptionData.SUB_ALL);
+ this.rebalanceImpl.getSubscriptionInner().put(topic,
subscriptionData);
+ }
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ private void startPullTask(Collection<MessageQueue> mqSet) {
+ for (MessageQueue messageQueue : mqSet) {
+ if (!this.taskTable.containsKey(messageQueue)) {
+ PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+ this.taskTable.put(messageQueue, pullTask);
+ this.scheduledThreadPoolExecutor.schedule(pullTask, 0,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private void updateAssignPullTask(Collection<MessageQueue> mqNewSet) {
+ Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it =
this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+ if (!mqNewSet.contains(next.getKey())) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+
+ startPullTask(mqNewSet);
+ }
+
+ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ Map<String, SubscriptionData> subTable =
rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ for (final Map.Entry<String, SubscriptionData> entry :
subTable.entrySet()) {
+ final String topic = entry.getKey();
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ }
+ }
+ }
+
+ public synchronized void subscribe(String topic, String subExpression)
throws MQClientException {
+ try {
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
+ topic, subExpression);
+ this.rebalanceImpl.getSubscriptionInner().put(topic,
subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new
MessageQueueListenerImpl());
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ public synchronized void unsubscribe(final String topic) {
+ this.rebalanceImpl.getSubscriptionInner().remove(topic);
+ //can be delete
+ removePullTaskCallback(topic);
+ assignedMessageQueue.removeAssignedMessageQueue(topic);
+ }
+
+ public synchronized void assign(Collection<MessageQueue> messageQueues) {
+ setSubscriptionType(SubscriptionType.ASSIGN);
+ assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
+ if (serviceState == ServiceState.RUNNING) {
+ updateAssignPullTask(messageQueues);
+ }
+ }
+
+ private void maybeAutoCommit() {
+ long now = System.currentTimeMillis();
+ if (now >= nextAutoCommitDeadline) {
+ commitAll();
+ nextAutoCommitDeadline = now +
defaultLitePullConsumer.getAutoCommitInterval() * 1000;
+ }
+ }
+
+ public List<MessageExt> poll(long timeout) {
+ try {
+ checkServiceState();
+ if (defaultLitePullConsumer.isAutoCommit()) {
+ maybeAutoCommit();
+ }
+ long endTime = System.currentTimeMillis() + timeout;
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ while (consumeRequest != null &&
consumeRequest.getProcessQueue().isDropped()) {
+ consumeRequest = consumeRequestCache.poll(endTime -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if ((endTime - System.currentTimeMillis()) <= 0)
+ break;
+ }
+ if (consumeRequest != null &&
!consumeRequest.getProcessQueue().isDropped()) {
+ List<MessageExt> messages = consumeRequest.getMessageExts();
+ long offset =
consumeRequest.getProcessQueue().removeMessage(messages);
+
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(),
offset);
+ return messages;
+ }
+ } catch (InterruptedException ignore) {
+
+ }
+ return null;
+ }
+
+ public void pause(Collection<MessageQueue> messageQueues) {
+ assignedMessageQueue.pause(messageQueues);
+ }
+
+ public void resume(Collection<MessageQueue> messageQueues) {
+ assignedMessageQueue.resume(messageQueues);
+ }
+
+ public synchronized void seek(MessageQueue messageQueue, long offset)
throws MQClientException {
+ if (offset < minOffset(messageQueue) || offset >
maxOffset(messageQueue))
+ throw new MQClientException("Seek offset illegal", null);
+ try {
+ assignedMessageQueue.setSeekOffset(messageQueue, offset);
+ updateConsumeOffset(messageQueue, offset);
+ updateConsumeOffsetToBroker(messageQueue, offset, false);
+ } catch (Exception e) {
+ log.error("Seek offset failed.", e);
+ }
+ }
+
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ }
+
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+ }
+
+ public void removePullTaskCallback(final String topic) {
+ removePullTask(topic);
+ }
+
+ public void removePullTask(final String topic) {
+ Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it =
this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+ }
+
+ public synchronized void commitSync() {
+ try {
+ for (MessageQueue messageQueue :
assignedMessageQueue.messageQueues()) {
+ long consumerOffset =
assignedMessageQueue.getConusmerOffset(messageQueue);
+ if (consumerOffset != -1) {
+ ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
+ long preConsumerOffset =
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+ if (processQueue != null && !processQueue.isDropped() &&
consumerOffset != preConsumerOffset) {
+ updateConsumeOffset(messageQueue, consumerOffset);
+ updateConsumeOffsetToBroker(messageQueue,
consumerOffset, false);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("An error occurred when update consume offset
synchronously.", e);
+ }
+ }
+
+ public synchronized void commitAll() {
+ try {
+ for (MessageQueue messageQueue :
assignedMessageQueue.messageQueues()) {
+ long consumerOffset =
assignedMessageQueue.getConusmerOffset(messageQueue);
+ if (consumerOffset != -1) {
+ ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
+ long preConsumerOffset =
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+ if (processQueue != null && !processQueue.isDropped() &&
consumerOffset != preConsumerOffset) {
+ updateConsumeOffset(messageQueue, consumerOffset);
+ updateConsumeOffsetToBroker(messageQueue,
consumerOffset, true);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("An error occurred when update consume offset
Automatically.");
+ }
+ }
+
+ private void updatePullOffset(MessageQueue remoteQueue, long
nextPullOffset) {
+ if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
+ assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+ }
+ }
+
+ private void submitConsumeRequest(ConsumeRequest consumeRequest) {
+ try {
+ consumeRequestCache.put(consumeRequest);
+ } catch (InterruptedException ex) {
+ log.error("Submit consumeRequest error", ex);
+ }
+ }
+
+ private long fetchConsumeOffset(MessageQueue mq, boolean fromStore) {
+ checkServiceState();
+ return this.offsetStore.readOffset(mq, fromStore ?
ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ }
+
+ private long nextPullOffset(MessageQueue remoteQueue) {
+ long offset = -1;
+ long seekOffset = assignedMessageQueue.getSeekOffset(remoteQueue);
+ if (seekOffset != -1) {
+ offset = seekOffset;
+ assignedMessageQueue.setSeekOffset(remoteQueue, -1);
+ assignedMessageQueue.updateNextOffset(remoteQueue,offset);
+ } else {
+ offset = assignedMessageQueue.getNextOffset(remoteQueue);
+ if (offset == -1) {
+ offset = fetchConsumeOffset(remoteQueue, false);
+ assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+ assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
+ }
+ }
+
+ return offset;
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().searchOffset(mq,
timestamp);
+ }
+
+ public class PullTaskImpl implements Runnable {
+ private final MessageQueue messageQueue;
+ private volatile boolean cancelled = false;
+
+ public PullTaskImpl(final MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ @Override
+ public void run() {
+ ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
+
+ if (processQueue == null && processQueue.isDropped()) {
+ log.info("the message queue not be able to poll, because it's
dropped. group={}, messageQueue={}",
defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+ return;
+ }
+
+ if (consumeRequestCache.size() *
defaultLitePullConsumer.getPullBatchNums() >
defaultLitePullConsumer.getPullThresholdForAll()) {
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+ log.warn("the consume request count exceeds threshold {},
so do flow control, consume request count={}, flowControlTimes={}",
consumeRequestCache.size(), consumeRequestFlowControlTimes);
+ return;
+ }
+
+ long cachedMessageCount = processQueue.getMsgCount().get();
+ long cachedMessageSizeInMiB = processQueue.getMsgSize().get() /
(1024 * 1024);
+
+ if (cachedMessageCount >
defaultLitePullConsumer.getPullThresholdForQueue()) {
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message count exceeds the threshold {}, so
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
flowControlTimes={}",
+ defaultLitePullConsumer.getPullThresholdForQueue(),
processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,
cachedMessageSizeInMiB, queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (cachedMessageSizeInMiB >
defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message size exceeds the threshold {} MiB,
so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
flowControlTimes={}",
+
defaultLitePullConsumer.getPullThresholdSizeForQueue(),
processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,
cachedMessageSizeInMiB, queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (processQueue.getMaxSpan() >
defaultLitePullConsumer.getConsumeMaxSpan()) {
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the queue's messages, span too long, so do flow
control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
queueMaxSpanFlowControlTimes);
+ }
+ return;
+ }
+
+ if (!this.isCancelled()) {
+ if (assignedMessageQueue.isPaused(messageQueue)) {
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+ log.debug("Message Queue: {} has been paused!",
messageQueue);
+ return;
+ }
+ String subExpression = null;
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ String topic = this.messageQueue.getTopic();
+ subExpression =
rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
+ }
+ long offset = nextPullOffset(messageQueue);
+ long pullDelayTimeMills = 0;
+ try {
+ PullResult pullResult = pull(messageQueue, subExpression,
offset, nextPullBatchNums());
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+
processQueue.putMessage(pullResult.getMsgFoundList());
+ submitConsumeRequest(new
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ pullDelayTimeMills = 0;
+ break;
+ case NO_NEW_MSG:
+ pullDelayTimeMills = 100;
+ case OFFSET_ILLEGAL:
+ //TODO
+ log.warn("the pull request offset illegal, {}",
pullResult.toString());
+ break;
+ default:
+ break;
+ }
+ updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
+ } catch (Throwable e) {
+ pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+ e.printStackTrace();
+ log.error("An error occurred in pull message process.", e);
+ }
+
+ if (!this.isCancelled()) {
+ scheduledThreadPoolExecutor.schedule(this,
pullDelayTimeMills, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("The Pull Task is cancelled after doPullTask,
{}", messageQueue);
+ }
+ }
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public void setCancelled(boolean cancelled) {
+ this.cancelled = cancelled;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+ }
+
+ private PullResult pull(MessageQueue mq, String subExpression, long
offset, int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+ return pull(mq, subExpression, offset, maxNums,
this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ private PullResult pull(MessageQueue mq, String subExpression, long
offset, int maxNums, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+ SubscriptionData subscriptionData = getSubscriptionData(mq,
subExpression);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false,
timeout);
+ }
+
+ private PullResult pull(MessageQueue mq, MessageSelector messageSelector,
long offset, int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+ return pull(mq, messageSelector, offset, maxNums,
this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ private PullResult pull(MessageQueue mq, MessageSelector messageSelector,
long offset, int maxNums, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+ SubscriptionData subscriptionData = getSubscriptionData(mq,
messageSelector);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false,
timeout);
+ }
+
+ private SubscriptionData getSubscriptionData(MessageQueue mq, String
subExpression)
+ throws MQClientException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ try {
+ return
FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+ mq.getTopic(), subExpression);
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+ }
+
+ private SubscriptionData getSubscriptionData(MessageQueue mq,
MessageSelector messageSelector)
+ throws MQClientException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ try {
+ return FilterAPI.build(mq.getTopic(),
+ messageSelector.getExpression(),
messageSelector.getExpressionType());
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+ }
+
+ private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData
subscriptionData, long offset, int maxNums,
+ boolean block,
+ long timeout)
+ throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ if (offset < 0) {
+ throw new MQClientException("offset < 0", null);
+ }
+
+ if (maxNums <= 0) {
+ throw new MQClientException("maxNums <= 0", null);
+ }
+
+ this.subscriptionAutomatically(mq.getTopic());
+
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+
+ long timeoutMillis = block ?
this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
+
+ boolean isTagType =
ExpressionType.isTagType(subscriptionData.getExpressionType());
+ PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
+ mq,
+ subscriptionData.getSubString(),
+ subscriptionData.getExpressionType(),
+ isTagType ? 0L : subscriptionData.getSubVersion(),
+ offset,
+ maxNums,
+ sysFlag,
+ 0,
+ this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(),
+ timeoutMillis,
+ CommunicationMode.SYNC,
+ null
+ );
+ this.pullAPIWrapper.processPullResult(mq, pullResult,
subscriptionData);
+ //If namespace not null , reset Topic without namespace.
+ this.resetTopic(pullResult.getMsgFoundList());
+ if (!this.consumeMessageHookList.isEmpty()) {
+ ConsumeMessageContext consumeMessageContext = null;
+ consumeMessageContext = new ConsumeMessageContext();
+
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+ consumeMessageContext.setConsumerGroup(this.groupName());
+ consumeMessageContext.setMq(mq);
+ consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
+ consumeMessageContext.setSuccess(false);
+ this.executeHookBefore(consumeMessageContext);
+
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+ consumeMessageContext.setSuccess(true);
+ this.executeHookAfter(consumeMessageContext);
+ }
+ return pullResult;
+ }
+
+ private void executeHookBefore(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ private void executeHookAfter(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ public void resetTopic(List<MessageExt> msgList) {
+ if (null == msgList || msgList.size() == 0) {
+ return;
+ }
+
+ //If namespace not null , reset Topic without namespace.
+ for (MessageExt messageExt : msgList) {
+ if (null != this.defaultLitePullConsumer.getNamespace()) {
+
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(),
this.defaultLitePullConsumer.getNamespace()));
+ }
+ }
+
+ }
+
+ public void subscriptionAutomatically(final String topic) {
+ if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
+ try {
+ SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+ topic, SubscriptionData.SUB_ALL);
+ this.rebalanceImpl.subscriptionInner.putIfAbsent(topic,
subscriptionData);
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ public void updateConsumeOffset(MessageQueue mq, long offset) {
+ checkServiceState();
+ this.offsetStore.updateOffset(mq, offset, false);
+ }
+
+ @Override
+ public String groupName() {
+ return this.defaultLitePullConsumer.getConsumerGroup();
+ }
+
+ @Override
+ public MessageModel messageModel() {
+ return this.defaultLitePullConsumer.getMessageModel();
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_ACTIVELY;
+ }
+
+ @Override
+ public ConsumeFromWhere consumeFromWhere() {
+ return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+ }
+
+ @Override
+ public Set<SubscriptionData> subscriptions() {
+ Set<SubscriptionData> result = new HashSet<SubscriptionData>();
+
+ Set<String> topics = this.defaultLitePullConsumer.getRegisterTopics();
+ if (topics != null) {
+ synchronized (topics) {
+ for (String t : topics) {
+ SubscriptionData ms = null;
+ try {
+ ms = FilterAPI.buildSubscriptionData(this.groupName(),
t, SubscriptionData.SUB_ALL);
+ } catch (Exception e) {
+ log.error("parse subscription error", e);
+ }
+ ms.setSubVersion(0L);
+ result.add(ms);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void doRebalance() {
+ if (this.rebalanceImpl != null) {
+ this.rebalanceImpl.doRebalance(false);
+ }
+ }
+
+ @Override
+ public void persistConsumerOffset() {
+ try {
+ checkServiceState();
+ Set<MessageQueue> mqs = new HashSet<MessageQueue>();
+ Set<MessageQueue> allocateMq =
this.rebalanceImpl.getProcessQueueTable().keySet();
+ mqs.addAll(allocateMq);
+ this.offsetStore.persistAll(mqs);
+ } catch (Exception e) {
+ log.error("group: " +
this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset
exception", e);
+ }
+ }
+
+ @Override
+ public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info)
{
+ Map<String, SubscriptionData> subTable =
this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic,
info);
+ }
+ }
+ }
+
+ @Override
+ public boolean isSubscribeTopicNeedUpdate(String topic) {
+ Map<String, SubscriptionData> subTable =
this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ return
!this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return this.defaultLitePullConsumer.isUnitMode();
+ }
+
+ @Override
+ public ConsumerRunningInfo consumerRunningInfo() {
+ ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+ Properties prop =
MixAll.object2Properties(this.defaultLitePullConsumer);
+ prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP,
String.valueOf(this.consumerStartTimestamp));
+ info.setProperties(prop);
+
+ info.getSubscriptionSet().addAll(this.subscriptions());
+ return info;
+ }
+
+ private void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+ sendMessageBack(msg, delayLevel, brokerName,
this.defaultLitePullConsumer.getConsumerGroup());
+ }
+
+ private void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName, String consumerGroup)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+ try {
+ String brokerAddr = (null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+
+ if (UtilAll.isBlank(consumerGroup)) {
+ consumerGroup =
this.defaultLitePullConsumer.getConsumerGroup();
+ }
+
+
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
msg, consumerGroup, delayLevel, 3000,
+ this.defaultLitePullConsumer.getMaxReconsumeTimes());
+ } catch (Exception e) {
+ log.error("sendMessageBack Exception, " +
this.defaultLitePullConsumer.getConsumerGroup(), e);
+
+ Message newMsg = new
Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()),
msg.getBody());
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg,
UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg,
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg,
String.valueOf(msg.getReconsumeTimes() + 1));
+ MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes()));
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+ this.mQClientFactory.getDefaultMQProducer().send(newMsg);
+ } finally {
+ msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
this.defaultLitePullConsumer.getNamespace()));
+ }
+ }
+
+ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
boolean isOneway) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public DefaultLitePullConsumer getDefaultLitePullConsumer() {
+ return defaultLitePullConsumer;
+ }
+
+ public Set<MessageQueue> fetchMessageQueues(String topic) throws
MQClientException {
+ checkServiceState();
+ // check if has info in memory, otherwise invoke api.
+ Set<MessageQueue> result =
this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+ if (null == result) {
+ result =
this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+ }
+
+ return parseMessageQueues(result);
+ }
+
+ private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {
+ Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
+ for (MessageQueue messageQueue : queueSet) {
+ String userTopic =
NamespaceUtil.withoutNamespace(messageQueue.getTopic(),
+ this.defaultLitePullConsumer.getNamespace());
+ resultQueues.add(new MessageQueue(userTopic,
messageQueue.getBrokerName(), messageQueue.getQueueId()));
+ }
+ return resultQueues;
+ }
+
+ public class ConsumeRequest {
+ private final List<MessageExt> messageExts;
+ private final MessageQueue messageQueue;
+ private final ProcessQueue processQueue;
+ private long startConsumeTimeMillis;
+
+ public ConsumeRequest(final List<MessageExt> messageExts, final
MessageQueue messageQueue,
+ final ProcessQueue processQueue) {
+ this.messageExts = messageExts;
+ this.messageQueue = messageQueue;
+ this.processQueue = processQueue;
+ }
+
+ public List<MessageExt> getMessageExts() {
+ return messageExts;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public long getStartConsumeTimeMillis() {
+ return startConsumeTimeMillis;
+ }
+
+ public void setStartConsumeTimeMillis(final long
startConsumeTimeMillis) {
+ this.startConsumeTimeMillis = startConsumeTimeMillis;
+ }
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index bc0884a..3c98385 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -68,7 +68,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
- protected final DefaultMQPullConsumer defaultMQPullConsumer;
+ private final DefaultMQPullConsumer defaultMQPullConsumer;
private final long consumerStartTimestamp = System.currentTimeMillis();
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new
ArrayList<ConsumeMessageHook>();
@@ -77,7 +77,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
protected MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
- protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
+ private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer
defaultMQPullConsumer, final RPCHook rpcHook) {
this.defaultMQPullConsumer = defaultMQPullConsumer;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
deleted file mode 100644
index ab229e4..0000000
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MessageQueueListener;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.filter.FilterAPI;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
-
- private final InternalLogger log = ClientLogger.getLog();
-
- private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
-
- private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
- new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
-
- private AssignedMessageQueue assignedMessageQueue = new
AssignedMessageQueue();
-
- private volatile Set<ConsumeRequest> consumedSet = new
HashSet<ConsumeRequest>();
-
- private final BlockingQueue<ConsumeRequest> consumeRequestCache = new
LinkedBlockingQueue<ConsumeRequest>();
-
- private final ScheduledExecutorService cleanExpireMsgExecutors;
-
- private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
- private ScheduledExecutorService autoCommitExecutors;
-
- private final ThreadLocal<ConsumeRequest> preConsumeRequestLocal = new
ThreadLocal<ConsumeRequest>();
-
- public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer
defaultMQPullConsumer, final RPCHook rpcHook) {
- super(defaultMQPullConsumer, rpcHook);
- this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
- this.cleanExpireMsgExecutors =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "Lite_CleanExpireMsgScheduledThread_"));
- this.autoCommitExecutors =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "Lite_AutoCommitScheduledThread_"));
-
- }
-
- public void updateAssignedMessageQueue(String topic, Set<MessageQueue>
assignedMessageQueue) {
-
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
- updatePullTask(topic, assignedMessageQueue);
- }
-
- public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
- Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it =
this.taskTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
- if (next.getKey().getTopic().equals(topic)) {
- if (!mqNewSet.contains(next.getKey())) {
- next.getValue().setCancelled(true);
- it.remove();
- }
- }
- }
-
- for (MessageQueue messageQueue : mqNewSet) {
- if (!this.taskTable.containsKey(messageQueue)) {
- PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
- this.taskTable.put(messageQueue, pullTask);
- this.scheduledThreadPoolExecutor.schedule(pullTask, 0,
TimeUnit.MILLISECONDS);
- }
- }
- }
-
- class MessageQueueListenerImpl implements MessageQueueListener {
- @Override
- public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
- MessageModel messageModel =
defaultMQPullConsumer.getMessageModel();
- switch (messageModel) {
- case BROADCASTING:
- updateAssignedMessageQueue(topic, mqAll);
- break;
- case CLUSTERING:
- updateAssignedMessageQueue(topic, mqDivided);
- break;
- default:
- break;
- }
- }
- }
-
- int nextPullBatchNums() {
- return Math.min(10, consumeRequestCache.remainingCapacity());
- }
-
- @Override
- public synchronized void start() throws MQClientException {
- this.defaultMQPullConsumer.setMessageQueueListener(new
MessageQueueListenerImpl());
- super.start();
- final String group = this.defaultMQPullConsumer.getConsumerGroup();
- this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
- new ThreadFactoryImpl("PullMsgThread-" + group)
- );
- this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- cleanExpireMsg();
- }
- }, this.defaultLiteMQPullConsumer.getConsumeTimeout(),
this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
- this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- if (defaultLiteMQPullConsumer.isAutoCommit()) {
- commitAll();
- }
- }
- }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(),
this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
- updateTopicSubscribeInfoWhenSubscriptionChanged();
- }
-
- private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
- Map<String, SubscriptionData> subTable =
rebalanceImpl.getSubscriptionInner();
- if (subTable != null) {
- for (final Map.Entry<String, SubscriptionData> entry :
subTable.entrySet()) {
- final String topic = entry.getKey();
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- }
- }
- }
-
- public List<MessageExt> poll(long timeout) {
- try {
- addToConsumed(preConsumeRequestLocal.get());
- ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout,
TimeUnit.MILLISECONDS);
- preConsumeRequestLocal.set(consumeRequest);
- if (consumeRequest != null) {
- List<MessageExt> messages = consumeRequest.getMessageExts();
- for (MessageExt messageExt : messages) {
- MessageAccessor.setConsumeStartTimeStamp(messageExt,
String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
- }
-
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
- return messages;
- }
- } catch (InterruptedException e) {
- log.error("poll ComsumeRequest error.", e);
- }
- return null;
- }
-
- public void pause(Collection<MessageQueue> messageQueues) {
- assignedMessageQueue.pause(messageQueues);
- }
-
- public void resume(Collection<MessageQueue> messageQueues) {
- assignedMessageQueue.resume(messageQueues);
- }
-
- public void seek(MessageQueue messageQueue, long offset) throws
MQClientException {
- this.updatePullOffset(messageQueue, offset);
- try {
- updateConsumeOffset(messageQueue, offset);
- } catch (MQClientException ex) {
- log.error("Seek offset to remote message queue error!", ex);
- throw ex;
- }
- }
-
- public void unsubscribe(final String topic) {
- super.unsubscribe(topic);
- removePullTaskCallback(topic);
- assignedMessageQueue.removeAssignedMessageQueue(topic);
- }
-
- public void removePullTaskCallback(final String topic) {
- removePullTask(topic);
- }
-
- public void removePullTask(final String topic) {
- synchronized (this.taskTable) {
- Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it =
this.taskTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
- if (next.getKey().getTopic().equals(topic)) {
- next.getValue().setCancelled(true);
- it.remove();
- }
- }
- }
- }
-
- public void commitSync() {
- addToConsumed(preConsumeRequestLocal.get());
- preConsumeRequestLocal.set(null);
- commitAll();
- }
-
- public void commitAll() {
- Set<ConsumeRequest> consumedRequests;
- synchronized (this.consumedSet) {
- consumedRequests = this.consumedSet;
- this.consumedSet = new HashSet<ConsumeRequest>();
- }
- for (ConsumeRequest consumeRequest : consumedRequests) {
-
consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
- }
- Set<Map.Entry<MessageQueue, ProcessQueue>> entrySet =
this.rebalanceImpl.getProcessQueueTable().entrySet();
- for (Map.Entry<MessageQueue, ProcessQueue> entry : entrySet) {
- try {
- long consumeOffset = entry.getValue().getConsumeOffset();
- if (consumeOffset != -1)
- updateConsumeOffset(entry.getKey(), consumeOffset);
- } catch (MQClientException e) {
- log.error("A error occurred in update consume offset
process.", e);
- }
- }
-
this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet());
- }
-
- private void commit(final MessageQueue messageQueue, final ProcessQueue
processQueue, final MessageExt messageExt) {
- long offset =
processQueue.removeMessage(Collections.singletonList(messageExt));
- try {
- updateConsumeOffset(messageQueue, offset);
- } catch (MQClientException e) {
- log.error("An error occurred in update consume offset process.",
e);
- }
- }
-
- public void subscribe(String topic, String subExpression) throws
MQClientException {
- try {
- SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
- topic, subExpression);
- this.rebalanceImpl.getSubscriptionInner().put(topic,
subscriptionData);
- if (this.mQClientFactory != null) {
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
- private void updatePullOffset(MessageQueue remoteQueue, long
nextPullOffset) {
- try {
- assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
- } catch (MQClientException e) {
- log.error("A error occurred in update consume: {} offset
process.", remoteQueue, e);
- }
- }
-
- private void addToConsumed(ConsumeRequest consumeRequest) {
- if (consumeRequest != null) {
- synchronized (this.consumedSet) {
- if (!consumedSet.contains(consumeRequest))
- consumedSet.add(consumeRequest);
- }
- }
- }
-
- private void submitConsumeRequest(ConsumeRequest consumeRequest) {
- try {
- consumeRequestCache.put(consumeRequest);
- } catch (InterruptedException ex) {
- log.error("Submit consumeRequest error", ex);
- }
- }
-
- private long nextPullOffset(MessageQueue remoteQueue) {
- long offset = -1;
- try {
- offset = assignedMessageQueue.getNextOffset(remoteQueue);
- if (offset == -1) {
- offset = fetchConsumeOffset(remoteQueue, false);
- assignedMessageQueue.updateNextOffset(remoteQueue, offset);
- }
- } catch (MQClientException e) {
- log.error("An error occurred in fetch consume offset process.", e);
- }
- return offset;
- }
-
- private void cleanExpireMsg() {
- for (final Map.Entry<MessageQueue, ProcessQueue> next :
rebalanceImpl.getProcessQueueTable().entrySet()) {
- ProcessQueue pq = next.getValue();
- MessageQueue mq = next.getKey();
- ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
- if (lockTreeMap == null) {
- log.error("Gets tree map lock in process queue error of
message queue:", mq);
- return;
- }
-
- TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
-
- int loop = msgTreeMap.size();
- for (int i = 0; i < loop; i++) {
- MessageExt msg = null;
- try {
- lockTreeMap.readLock().lockInterruptibly();
- try {
- if (!msgTreeMap.isEmpty()) {
- msg = msgTreeMap.firstEntry().getValue();
- if (System.currentTimeMillis() -
Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
- >
this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
- //Expired, ack and remove it.
- } else {
- break;
- }
- } else {
- break;
- }
- } finally {
- lockTreeMap.readLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("Gets expired message exception", e);
- }
-
- try {
- this.defaultMQPullConsumer.sendMessageBack(msg, 3);
- log.info("Send expired msg back. topic={}, msgId={},
storeHost={}, queueId={}, queueOffset={}",
- msg.getTopic(), msg.getMsgId(), msg.getStoreHost(),
msg.getQueueId(), msg.getQueueOffset());
- log.info("Send expired msg back.");
- commit(mq, pq, msg);
- } catch (Exception e) {
- log.error("Send back expired msg exception", e);
- }
- }
- }
- }
-
- private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
- try {
- return (ReadWriteLock) FieldUtils.readDeclaredField(pq,
"lockTreeMap", true);
- } catch (IllegalAccessException e) {
- return null;
- }
- }
-
- public class PullTaskImpl implements Runnable {
- private final MessageQueue messageQueue;
- private volatile boolean cancelled = false;
-
- public PullTaskImpl(final MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
- @Override
- public void run() {
- String topic = this.messageQueue.getTopic();
- if (!this.isCancelled()) {
- if (assignedMessageQueue.isPaused(messageQueue)) {
- scheduledThreadPoolExecutor.schedule(this, 1000,
TimeUnit.MILLISECONDS);
- log.debug("Message Queue: {} has been paused!",
messageQueue);
- return;
- }
- SubscriptionData subscriptionData =
rebalanceImpl.getSubscriptionInner().get(topic);
- long offset = nextPullOffset(messageQueue);
- long pullDelayTimeMills = 0;
- try {
- PullResult pullResult = pull(messageQueue,
subscriptionData.getSubString(), offset, nextPullBatchNums());
- ProcessQueue processQueue =
rebalanceImpl.getProcessQueueTable().get(messageQueue);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- if (processQueue != null) {
-
processQueue.putMessage(pullResult.getMsgFoundList());
- submitConsumeRequest(new
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
- }
- break;
- default:
- break;
- }
- updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
- } catch (Throwable e) {
- pullDelayTimeMills = 1000;
- e.printStackTrace();
- log.error("An error occurred in pull message process.", e);
- }
-
- if (!this.isCancelled()) {
- scheduledThreadPoolExecutor.schedule(this,
pullDelayTimeMills, TimeUnit.MILLISECONDS);
- } else {
- log.warn("The Pull Task is cancelled after doPullTask,
{}", messageQueue);
- }
- }
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
- public void setCancelled(boolean cancelled) {
- this.cancelled = cancelled;
- }
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
- }
-
- public class ConsumeRequest {
- private final List<MessageExt> messageExts;
- private final MessageQueue messageQueue;
- private final ProcessQueue processQueue;
- private long startConsumeTimeMillis;
-
- public ConsumeRequest(final List<MessageExt> messageExts, final
MessageQueue messageQueue,
- final ProcessQueue processQueue) {
- this.messageExts = messageExts;
- this.messageQueue = messageQueue;
- this.processQueue = processQueue;
- }
-
- public List<MessageExt> getMessageExts() {
- return messageExts;
- }
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
- public ProcessQueue getProcessQueue() {
- return processQueue;
- }
-
- public long getStartConsumeTimeMillis() {
- return startConsumeTimeMillis;
- }
-
- public void setStartConsumeTimeMillis(final long
startConsumeTimeMillis) {
- this.startConsumeTimeMillis = startConsumeTimeMillis;
- }
- }
-}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index e9a1c72..092da9a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -433,14 +433,4 @@ public class ProcessQueue {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
- public long getConsumeOffset() {
-
- if (msgTreeMap.isEmpty() && queueOffsetMax == 0L)
- return -1;
-
- if (!msgTreeMap.isEmpty())
- return msgTreeMap.firstKey();
- else
- return queueOffsetMax + 1;
- }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
new file mode 100644
index 0000000..8148c7d
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -0,0 +1,68 @@
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+
+public class RebalanceLitePullImpl extends RebalanceImpl {
+
+ private final DefaultLitePullConsumerImpl litePullConsumerImpl;
+
+ public RebalanceLitePullImpl(DefaultLitePullConsumerImpl
litePullConsumerImpl) {
+ this(null, null, null, null, litePullConsumerImpl);
+ }
+
+ public RebalanceLitePullImpl(String consumerGroup, MessageModel
messageModel,
+ AllocateMessageQueueStrategy
allocateMessageQueueStrategy,
+ MQClientInstance mQClientFactory,
DefaultLitePullConsumerImpl litePullConsumerImpl) {
+ super(consumerGroup, messageModel, allocateMessageQueueStrategy,
mQClientFactory);
+ this.litePullConsumerImpl = litePullConsumerImpl;
+ }
+
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
+ MessageQueueListener messageQueueListener =
this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
+ if (messageQueueListener != null) {
+ try {
+ messageQueueListener.messageQueueChanged(topic, mqAll,
mqDivided);
+ } catch (Throwable e) {
+ log.error("messageQueueChanged exception", e);
+ }
+ }
+ }
+
+
+ @Override
+ public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue
pq) {
+ this.litePullConsumerImpl.getOffsetStore().persist(mq);
+ this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
+ return true;
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_ACTIVELY;
+ }
+
+ @Override
+ public void removeDirtyOffset(final MessageQueue mq) {
+ this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
+ }
+
+ @Override
+ public long computePullFromWhere(MessageQueue mq) {
+ return 0;
+ }
+
+ @Override
+ public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ }
+
+
+}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 215763b..488a499 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -16,34 +16,24 @@
*/
package org.apache.rocketmq.example.simple;
-import java.util.Arrays;
import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
+
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
- DefaultLiteMQPullConsumer litePullConsumer = new
DefaultLiteMQPullConsumer("test", null);
+ DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer("test");
litePullConsumer.setNamesrvAddr("localhost:9876");
- litePullConsumer.subscribe("litepullconsumertest9", null);
+ litePullConsumer.setAutoCommit(true);
+ litePullConsumer.subscribe("test41","TagA" );
litePullConsumer.start();
- MessageQueue messageQueue = new MessageQueue("test",
"IT-C02YW28FLVDL.local", 1);
+
int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
- i++;
- if (i == 3) {
- System.out.printf("pause%n");
- litePullConsumer.pause(Arrays.asList(messageQueue));
- }
- if (i == 10) {
- System.out.printf("resume%n");
- litePullConsumer.resume(Arrays.asList(messageQueue));
- }
-//
- litePullConsumer.commitSync();
}
+
}
}