This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/litePullConsumer by this push:
new 46288ab Polish lite pull consumer and fix bug (#1373)
46288ab is described below
commit 46288abb33e7f4cf9ca59dbfbff12be96fd8494a
Author: King <[email protected]>
AuthorDate: Fri Aug 9 11:26:53 2019 +0800
Polish lite pull consumer and fix bug (#1373)
* fix unsubscribe code
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* polish commit consumed offset
* pass checkstyle
* pass checkstyle
* polish LiteMQPullConsumer
* add flow control and polish commit logic
* fix bug
* polish code
* fix commit consumed offset back
* refactor litePullConsumer
* development save
* development save
* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
* Polish lite pull consumer
* polish lite pull consumer
* polish lite pull consumer
* fix seek
* fix seek function
* polish lite pull consumer
* add apache header
* add test
* polish test
* Make broadcast model work for litePullConsumer
* Revert example/broadcast/PushConsumer.java
* Add delay time when no new message
* Enable long polling mode
* Fix subscribe bug when rebalance
* Delete useless consumeMessageHook
---
.../client/consumer/DefaultLitePullConsumer.java | 15 +-
.../client/impl/consumer/AssignedMessageQueue.java | 44 ++++--
.../impl/consumer/DefaultLitePullConsumerImpl.java | 159 ++++++---------------
.../rocketmq/example/broadcast/PushConsumer.java | 2 +-
4 files changed, 80 insertions(+), 140 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 7f65713..1858fa1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -17,9 +17,7 @@
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -69,10 +67,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
* Offset Storage
*/
private OffsetStore offsetStore;
- /**
- * Topic set you want to register
- */
- private Set<String> registerTopics = new HashSet<String>();
+
/**
* Queue allocation algorithm
*/
@@ -372,14 +367,6 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
this.messageQueueListener = messageQueueListener;
}
- public Set<String> getRegisterTopics() {
- return registerTopics;
- }
-
- public void setRegisterTopics(Set<String> registerTopics) {
- this.registerTopics = withNamespace(registerTopics);
- }
-
public long getConsumerPullTimeoutMillis() {
return consumerPullTimeoutMillis;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index aa8379e..b21fd01 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -37,7 +38,7 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
- public Collection<MessageQueue> messageQueues() {
+ public Set<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
@@ -130,6 +131,23 @@ public class AssignedMessageQueue {
return null;
}
+ public void updateAssignedMessageQueue(String topic,
Collection<MessageQueue> assigned) {
+ synchronized (this.assignedMessageQueueState) {
+ Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it =
this.assignedMessageQueueState.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ if (!assigned.contains(next.getKey())) {
+ System.out.printf("MessageQueue-%s is removed %n",
next.getKey());
+ next.getValue().getProcessQueue().setDropped(true);
+ it.remove();
+ }
+ }
+ }
+ addAssignedMessageQueue(assigned);
+ }
+ }
+
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it =
this.assignedMessageQueueState.entrySet().iterator();
@@ -140,18 +158,22 @@ public class AssignedMessageQueue {
it.remove();
}
}
+ addAssignedMessageQueue(assigned);
+ }
+ }
- for (MessageQueue messageQueue : assigned) {
- if (!this.assignedMessageQueueState.containsKey(messageQueue))
{
- MessageQueueStat messageQueueStat;
- if (rebalanceImpl != null &&
rebalanceImpl.processQueueTable.get(messageQueue) != null) {
- messageQueueStat = new MessageQueueStat(messageQueue,
rebalanceImpl.processQueueTable.get(messageQueue));
- } else {
- ProcessQueue processQueue = new ProcessQueue();
- messageQueueStat = new MessageQueueStat(messageQueue,
processQueue);
- }
- this.assignedMessageQueueState.put(messageQueue,
messageQueueStat);
+ private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
+ for (MessageQueue messageQueue : assigned) {
+ if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
+ MessageQueueStat messageQueueStat;
+ if (rebalanceImpl != null &&
rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
+ System.out.printf("MessageQueue-%s is added %n",
messageQueue);
+ messageQueueStat = new MessageQueueStat(messageQueue,
rebalanceImpl.getProcessQueueTable().get(messageQueue));
+ } else {
+ ProcessQueue processQueue = new ProcessQueue();
+ messageQueueStat = new MessageQueueStat(messageQueue,
processQueue);
}
+ this.assignedMessageQueueState.put(messageQueue,
messageQueueStat);
}
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 74cf644..07ef1cf 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -37,15 +37,12 @@ import
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.hook.ConsumeMessageContext;
-import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
@@ -55,15 +52,10 @@ import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -74,7 +66,6 @@ import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@@ -85,8 +76,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private final RPCHook rpcHook;
- private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new
ArrayList<ConsumeMessageHook>();
-
private final ArrayList<FilterMessageHook> filterMessageHookList = new
ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -122,6 +111,10 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
* Delay some time when suspend pull service
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+ /**
+ * Delay some time when no new message
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0;
private DefaultLitePullConsumer defaultLitePullConsumer;
@@ -143,10 +136,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private long nextAutoCommitDeadline = -1L;
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer
defaultLitePullConsumer, final RPCHook rpcHook) {
-
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
-
}
private void checkServiceState() {
@@ -162,8 +153,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
private void updateAssignedMessageQueue(String topic, Set<MessageQueue>
assignedMessageQueue) {
-
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
- updatePullTask(topic, assignedMessageQueue);
+ this.assignedMessageQueue.updateAssignedMessageQueue(topic,
assignedMessageQueue);
}
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
@@ -187,9 +177,11 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
+ updatePullTask(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
+ updatePullTask(topic, mqDivided);
break;
default:
break;
@@ -356,13 +348,16 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private void copySubscription() throws MQClientException {
try {
- Set<String> registerTopics =
this.defaultLitePullConsumer.getRegisterTopics();
- if (registerTopics != null) {
- for (final String topic : registerTopics) {
- SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
- topic, SubscriptionData.SUB_ALL);
- this.rebalanceImpl.getSubscriptionInner().put(topic,
subscriptionData);
- }
+ switch (this.defaultLitePullConsumer.getMessageModel()) {
+ case BROADCASTING:
+ break;
+ case CLUSTERING:
+ /*
+ * Retry topic support in the future.
+ */
+ break;
+ default:
+ break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
@@ -421,7 +416,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
public synchronized void unsubscribe(final String topic) {
this.rebalanceImpl.getSubscriptionInner().remove(topic);
- //can be delete
removePullTaskCallback(topic);
assignedMessageQueue.removeAssignedMessageQueue(topic);
}
@@ -484,8 +478,13 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset)
throws MQClientException {
- if (!assignedMessageQueue.messageQueues().contains(messageQueue))
- throw new MQClientException("The message queue is not in assigned
list, message queue: " + messageQueue, null);
+ if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ throw new MQClientException("The message queue is not in
assigned list, may be rebalancing, message queue: " + messageQueue, null);
+ } else {
+ throw new MQClientException("The message queue is not in
assigned list, message queue: " + messageQueue, null);
+ }
+ }
long minOffset = minOffset(messageQueue);
long maxOffset = maxOffset(messageQueue);
if (offset < minOffset || offset > maxOffset)
@@ -552,6 +551,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
}
+ if (defaultLitePullConsumer.getMessageModel() ==
MessageModel.BROADCASTING)
+ offsetStore.persistAll(assignedMessageQueue.messageQueues());
} catch (Exception e) {
log.error("An error occurred when update consume offset
synchronously.", e);
}
@@ -570,6 +571,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
}
+ if (defaultLitePullConsumer.getMessageModel() ==
MessageModel.BROADCASTING)
+ offsetStore.persistAll(assignedMessageQueue.messageQueues());
} catch (Exception e) {
log.error("An error occurred when update consume offset
Automatically.");
}
@@ -605,6 +608,9 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
offset = assignedMessageQueue.getPullOffset(remoteQueue);
if (offset == -1) {
offset = fetchConsumeOffset(remoteQueue, false);
+ if (offset == -1 && defaultLitePullConsumer.getMessageModel()
== MessageModel.BROADCASTING) {
+ offset = 0;
+ }
assignedMessageQueue.updatePullOffset(remoteQueue, offset);
assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
}
@@ -706,6 +712,9 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {}",
pullResult.toString());
break;
+ case NO_NEW_MSG:
+ pullDelayTimeMills =
PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
+ break;
default:
break;
}
@@ -745,7 +754,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private PullResult pull(MessageQueue mq, String subExpression, long
offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq,
subExpression);
- return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false,
timeout);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true,
timeout);
}
private PullResult pull(MessageQueue mq, MessageSelector messageSelector,
long offset, int maxNums)
@@ -756,7 +765,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private PullResult pull(MessageQueue mq, MessageSelector messageSelector,
long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq,
messageSelector);
- return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false,
timeout);
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true,
timeout);
}
private SubscriptionData getSubscriptionData(MessageQueue mq, String
subExpression)
@@ -830,43 +839,9 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
this.pullAPIWrapper.processPullResult(mq, pullResult,
subscriptionData);
//If namespace not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
- if (!this.consumeMessageHookList.isEmpty()) {
- ConsumeMessageContext consumeMessageContext = null;
- consumeMessageContext = new ConsumeMessageContext();
-
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
- consumeMessageContext.setConsumerGroup(this.groupName());
- consumeMessageContext.setMq(mq);
- consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
- consumeMessageContext.setSuccess(false);
- this.executeHookBefore(consumeMessageContext);
-
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
- consumeMessageContext.setSuccess(true);
- this.executeHookAfter(consumeMessageContext);
- }
return pullResult;
}
- private void executeHookBefore(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageBefore(context);
- } catch (Throwable ignored) {
- }
- }
- }
- }
-
- private void executeHookAfter(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageAfter(context);
- } catch (Throwable ignored) {
- }
- }
- }
- }
public void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
@@ -920,25 +895,11 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
@Override
public Set<SubscriptionData> subscriptions() {
- Set<SubscriptionData> result = new HashSet<SubscriptionData>();
-
- Set<String> topics = this.defaultLitePullConsumer.getRegisterTopics();
- if (topics != null) {
- synchronized (topics) {
- for (String t : topics) {
- SubscriptionData ms = null;
- try {
- ms = FilterAPI.buildSubscriptionData(this.groupName(),
t, SubscriptionData.SUB_ALL);
- } catch (Exception e) {
- log.error("parse subscription error", e);
- }
- ms.setSubVersion(0L);
- result.add(ms);
- }
- }
- }
+ Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
- return result;
+ subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
+
+ return subSet;
}
@Override
@@ -1000,41 +961,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
return info;
}
- private void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName)
- throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
- sendMessageBack(msg, delayLevel, brokerName,
this.defaultLitePullConsumer.getConsumerGroup());
- }
-
- private void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
- try {
- String brokerAddr = (null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-
- if (UtilAll.isBlank(consumerGroup)) {
- consumerGroup =
this.defaultLitePullConsumer.getConsumerGroup();
- }
-
-
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
msg, consumerGroup, delayLevel, 3000,
- this.defaultLitePullConsumer.getMaxReconsumeTimes());
- } catch (Exception e) {
- log.error("sendMessageBack Exception, " +
this.defaultLitePullConsumer.getConsumerGroup(), e);
-
- Message newMsg = new
Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()),
msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg,
UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg,
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg,
String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- } finally {
- msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
this.defaultLitePullConsumer.getNamespace()));
- }
- }
-
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
@@ -1044,6 +970,11 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
return offsetStore;
}
+ public void registerFilterMessageHook(final FilterMessageHook hook) {
+ this.filterMessageHookList.add(hook);
+ log.info("register FilterMessageHook Hook, {}", hook.hookName());
+ }
+
public DefaultLitePullConsumer getDefaultLitePullConsumer() {
return defaultLitePullConsumer;
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
index fb1f9bb..28e0234 100644
---
a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -50,4 +50,4 @@ public class PushConsumer {
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
-}
+}
\ No newline at end of file