This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 4.9.x in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9872667a7671ee1c7a00c5a0ba8a1b24db691a6c Author: xiaoyi <[email protected]> AuthorDate: Thu Aug 25 09:08:23 2022 +0800 [ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is supported. At the same time, you can specify the offset that the MessageQueue will commit (#4820) (cherry picked from commit a1d2a7325505d572a80f4fe18928a1997a3abbcc) --- .../client/consumer/DefaultLitePullConsumer.java | 36 ++++++- .../rocketmq/client/consumer/LitePullConsumer.java | 23 +++++ .../impl/consumer/DefaultLitePullConsumerImpl.java | 107 ++++++++++++++++++--- .../consumer/DefaultLitePullConsumerTest.java | 5 +- 4 files changed, 155 insertions(+), 16 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 6b8d1b4ae..76acd6338 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; @@ -265,7 +266,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void unsubscribe(String topic) { this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); } - @Override public void assign(Collection<MessageQueue> messageQueues) { defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); @@ -322,6 +322,40 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this.defaultLitePullConsumerImpl.commitAll(); } + /** + * Offset specified by batch commit + * @param offsetMap + * @param persist + */ + @Override + public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist) { + this.defaultLitePullConsumerImpl.commit(offsetMap, persist); + } + + /** + * Get the MessageQueue assigned in subscribe mode + * + * @return + * @throws MQClientException + */ + @Override + public Set<MessageQueue> assignment() throws MQClientException { + return this.defaultLitePullConsumerImpl.assignment(); + } + + /** + * Subscribe some topic with subExpression and messageQueueListener + * + * @param topic + * @param subExpression + * @param messageQueueListener + */ + @Override + public void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression, messageQueueListener); + } + + @Override public void commit(final Set<MessageQueue> messageQueues, boolean persist) { this.defaultLitePullConsumerImpl.commit(messageQueues, persist); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java index 8bca31c78..e9e67d055 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; public interface LitePullConsumer { @@ -52,6 +53,14 @@ public interface LitePullConsumer { */ void subscribe(final String topic, final String subExpression) throws MQClientException; + /** + * Subscribe some topic with subExpression and messageQueueListener + * @param topic + * @param subExpression + * @param messageQueueListener + */ + void subscribe(final String topic, final String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException; + /** * Subscribe some topic with selector. * @@ -67,6 +76,14 @@ public interface LitePullConsumer { */ void unsubscribe(final String topic); + + /** + * subscribe mode, get assigned MessageQueue + * @return + * @throws MQClientException + */ + Set<MessageQueue> assignment() throws MQClientException; + /** * Manually assign a list of message queues to this consumer. This interface does not allow for incremental * assignment and will replace the previous assignment (if there is one). @@ -170,6 +187,12 @@ public interface LitePullConsumer { */ void commitSync(); + /** + * Offset specified by batch commit + * @param offsetMap + * @param persist + */ + void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist); void commit(final Set<MessageQueue> messageQueues, boolean persist); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f3cd7d5b8..267ee1c77 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -241,19 +241,23 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); - switch (messageModel) { - case BROADCASTING: - updateAssignedMessageQueue(topic, mqAll); - updatePullTask(topic, mqAll); - break; - case CLUSTERING: - updateAssignedMessageQueue(topic, mqDivided); - updatePullTask(topic, mqDivided); - break; - default: - break; - } + updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided); + } + } + + public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); + switch (messageModel) { + case BROADCASTING: + updateAssignedMessageQueue(topic, mqAll); + updatePullTask(topic, mqAll); + break; + case CLUSTERING: + updateAssignedMessageQueue(topic, mqDivided); + updatePullTask(topic, mqDivided); + break; + default: + break; } } @@ -472,6 +476,41 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + /** + * subscribe data by customizing messageQueueListener + * @param topic + * @param subExpression + * @param messageQueueListener + * @throws MQClientException + */ + public synchronized void subscribe(String topic, String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException { + try { + if (StringUtils.isEmpty(topic)) { + throw new IllegalArgumentException("Topic can not be null or empty."); + } + setSubscriptionType(SubscriptionType.SUBSCRIBE); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListener() { + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + // First, update the assign queue + updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided); + // run custom listener + messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); + } + }); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void subscribe(String topic, String subExpression) throws MQClientException { try { if (topic == null || "".equals(topic)) { @@ -673,6 +712,42 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + /** + * Specify offset commit + * @param messageQueues + * @param persist + */ + public synchronized void commit(final Map<MessageQueue, Long> messageQueues, boolean persist) { + if (messageQueues == null || messageQueues.size() == 0) { + log.warn("MessageQueues is empty, Ignore this commit "); + return; + } + for (Map.Entry<MessageQueue, Long> messageQueueEntry : messageQueues.entrySet()) { + MessageQueue messageQueue = messageQueueEntry.getKey(); + long offset = messageQueueEntry.getValue(); + if (offset != -1) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + if (processQueue != null && !processQueue.isDropped()) { + updateConsumeOffset(messageQueue, offset); + } + } else { + log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "]."); + } + } + + if (persist) { + this.offsetStore.persistAll(messageQueues.keySet()); + } + } + + /** + * Get the queue assigned in subscribe mode + * @return + */ + public synchronized Set<MessageQueue> assignment() { + return assignedMessageQueue.getAssignedMessageQueues(); + } + public synchronized void commit(final Set<MessageQueue> messageQueues, boolean persist) { if (messageQueues == null || messageQueues.size() == 0) { return; @@ -1150,8 +1225,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return isEqual; } + public AssignedMessageQueue getAssignedMessageQueue() { + return assignedMessageQueue; + } + public synchronized void registerTopicMessageQueueChangeListener(String topic, - TopicMessageQueueChangeListener listener) throws MQClientException { + TopicMessageQueueChangeListener listener) throws MQClientException { if (topic == null || listener == null) { throw new MQClientException("Topic or listener is null", null); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 0f0327f0f..2c79ed007 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer; +import java.util.ArrayList; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.store.OffsetStore; @@ -704,7 +705,9 @@ public class DefaultLitePullConsumerTest { messageClientExt.setOffsetMsgId("234"); messageClientExt.setBornHost(new InetSocketAddress(8080)); messageClientExt.setStoreHost(new InetSocketAddress(8080)); - PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + List<MessageExt> list = new ArrayList<MessageExt>(); + list.add(messageClientExt); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, list); return pullResult; } });
