This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a1d2a7325 [ISSUE #4805]In the subscribe mode, user-defined
MessageQueueListener is supported. At the same time, you can specify the offset
that the MessageQueue will commit (#4820)
a1d2a7325 is described below
commit a1d2a7325505d572a80f4fe18928a1997a3abbcc
Author: xiaoyi <[email protected]>
AuthorDate: Thu Aug 25 09:08:23 2022 +0800
[ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is
supported. At the same time, you can specify the offset that the MessageQueue
will commit (#4820)
---
.../client/consumer/DefaultLitePullConsumer.java | 36 ++++++-
.../rocketmq/client/consumer/LitePullConsumer.java | 23 +++++
.../impl/consumer/DefaultLitePullConsumerImpl.java | 107 ++++++++++++++++++---
.../consumer/DefaultLitePullConsumerTest.java | 72 ++++++++++++++
4 files changed, 223 insertions(+), 15 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6b8d1b4ae..76acd6338 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -265,7 +266,6 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
-
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -322,6 +322,40 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
this.defaultLitePullConsumerImpl.commitAll();
}
+ /**
+ * Offset specified by batch commit
+ * @param offsetMap
+ * @param persist
+ */
+ @Override
+ public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist)
{
+ this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
+ }
+
+ /**
+ * Get the MessageQueue assigned in subscribe mode
+ *
+ * @return
+ * @throws MQClientException
+ */
+ @Override
+ public Set<MessageQueue> assignment() throws MQClientException {
+ return this.defaultLitePullConsumerImpl.assignment();
+ }
+
+ /**
+ * Subscribe some topic with subExpression and messageQueueListener
+ *
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ */
+ @Override
+ public void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic),
subExpression, messageQueueListener);
+ }
+
+
@Override
public void commit(final Set<MessageQueue> messageQueues, boolean persist)
{
this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 8bca31c78..e9e67d055 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public interface LitePullConsumer {
@@ -52,6 +53,14 @@ public interface LitePullConsumer {
*/
void subscribe(final String topic, final String subExpression) throws
MQClientException;
+ /**
+ * Subscribe some topic with subExpression and messageQueueListener
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ */
+ void subscribe(final String topic, final String subExpression, final
MessageQueueListener messageQueueListener) throws MQClientException;
+
/**
* Subscribe some topic with selector.
*
@@ -67,6 +76,14 @@ public interface LitePullConsumer {
*/
void unsubscribe(final String topic);
+
+ /**
+ * subscribe mode, get assigned MessageQueue
+ * @return
+ * @throws MQClientException
+ */
+ Set<MessageQueue> assignment() throws MQClientException;
+
/**
* Manually assign a list of message queues to this consumer. This
interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
@@ -170,6 +187,12 @@ public interface LitePullConsumer {
*/
void commitSync();
+ /**
+ * Offset specified by batch commit
+ * @param offsetMap
+ * @param persist
+ */
+ void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist);
void commit(final Set<MessageQueue> messageQueues, boolean persist);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c720ef075..1e40ddaf6 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -240,19 +240,23 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
- MessageModel messageModel =
defaultLitePullConsumer.getMessageModel();
- switch (messageModel) {
- case BROADCASTING:
- updateAssignedMessageQueue(topic, mqAll);
- updatePullTask(topic, mqAll);
- break;
- case CLUSTERING:
- updateAssignedMessageQueue(topic, mqDivided);
- updatePullTask(topic, mqDivided);
- break;
- default:
- break;
- }
+ updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+ }
+ }
+
+ public void updateAssignQueueAndStartPullTask(String topic,
Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+ switch (messageModel) {
+ case BROADCASTING:
+ updateAssignedMessageQueue(topic, mqAll);
+ updatePullTask(topic, mqAll);
+ break;
+ case CLUSTERING:
+ updateAssignedMessageQueue(topic, mqDivided);
+ updatePullTask(topic, mqDivided);
+ break;
+ default:
+ break;
}
}
@@ -474,6 +478,41 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
+ /**
+ * subscribe data by customizing messageQueueListener
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ * @throws MQClientException
+ */
+ public synchronized void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
+ try {
+ if (StringUtils.isEmpty(topic)) {
+ throw new IllegalArgumentException("Topic can not be null or
empty.");
+ }
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(topic, subExpression);
+ this.rebalanceImpl.getSubscriptionInner().put(topic,
subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new
MessageQueueListener() {
+ @Override
+ public void messageQueueChanged(String topic,
Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ // First, update the assign queue
+ updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+ // run custom listener
+ messageQueueListener.messageQueueChanged(topic, mqAll,
mqDivided);
+ }
+ });
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscribe exception", e);
+ }
+ }
+
+
public synchronized void subscribe(String topic, String subExpression)
throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
@@ -687,6 +726,42 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
+ /**
+ * Specify offset commit
+ * @param messageQueues
+ * @param persist
+ */
+ public synchronized void commit(final Map<MessageQueue, Long>
messageQueues, boolean persist) {
+ if (messageQueues == null || messageQueues.size() == 0) {
+ log.warn("MessageQueues is empty, Ignore this commit ");
+ return;
+ }
+ for (Map.Entry<MessageQueue, Long> messageQueueEntry :
messageQueues.entrySet()) {
+ MessageQueue messageQueue = messageQueueEntry.getKey();
+ long offset = messageQueueEntry.getValue();
+ if (offset != -1) {
+ ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null && !processQueue.isDropped()) {
+ updateConsumeOffset(messageQueue, offset);
+ }
+ } else {
+ log.error("consumerOffset is -1 in messageQueue [" +
messageQueue + "].");
+ }
+ }
+
+ if (persist) {
+ this.offsetStore.persistAll(messageQueues.keySet());
+ }
+ }
+
+ /**
+ * Get the queue assigned in subscribe mode
+ * @return
+ */
+ public synchronized Set<MessageQueue> assignment() {
+ return assignedMessageQueue.getAssignedMessageQueues();
+ }
+
public synchronized void commit(final Set<MessageQueue> messageQueues,
boolean persist) {
if (messageQueues == null || messageQueues.size() == 0) {
return;
@@ -1147,8 +1222,12 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
return isEqual;
}
+ public AssignedMessageQueue getAssignedMessageQueue() {
+ return assignedMessageQueue;
+ }
+
public synchronized void registerTopicMessageQueueChangeListener(String
topic,
- TopicMessageQueueChangeListener listener) throws MQClientException {
+
TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 128e4cd8a..1db13f89c 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -21,9 +21,12 @@ import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
@@ -131,6 +134,26 @@ public class DefaultLitePullConsumerTest {
}
}
+ @Test
+ public void testSubscribeWithListener_PollMessageSuccess() throws
Exception {
+ DefaultLitePullConsumer litePullConsumer =
createSubscribeLitePullConsumerWithListener();
+ try {
+ Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic,
messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List<MessageExt> result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+
+ Set<MessageQueue> assignment= litePullConsumer.assignment();
+
assertThat(assignment.stream().findFirst().get()).isEqualTo(messageQueueSet.stream().findFirst().get());
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+
@Test
public void testAssign_PollMessageWithTagSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createStartLitePullConsumerWithTag();
@@ -146,6 +169,36 @@ public class DefaultLitePullConsumerTest {
}
}
+ @Test
+ public void testConsumerCommitSyncWithMQOffset() throws Exception {
+ DefaultLitePullConsumer litePullConsumer =
createNotStartLitePullConsumer();
+ RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+ litePullConsumer.setOffsetStore(store);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+
+ //replace with real offsetStore.
+ Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+ offsetStore.setAccessible(true);
+ offsetStore.set(litePullConsumerImpl, store);
+
+ MessageQueue messageQueue = createMessageQueue();
+ HashSet<MessageQueue> set = new HashSet<MessageQueue>();
+ set.add(messageQueue);
+
+ //mock assign and reset offset
+ litePullConsumer.assign(set);
+ litePullConsumer.seek(messageQueue, 0);
+
+ //commit offset 1
+ Map<MessageQueue, Long> commitOffset = new HashMap<>();
+ commitOffset.put(messageQueue, 1L);
+ litePullConsumer.commitSync(commitOffset, true);
+
+ assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+ }
+
+
@Test
public void testSubscribe_PollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createSubscribeLitePullConsumer();
@@ -592,6 +645,7 @@ public class DefaultLitePullConsumerTest {
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
}
+
static class AsyncConsumer {
public void executeAsync(final DefaultLitePullConsumer consumer) {
new Thread(new Runnable() {
@@ -734,6 +788,23 @@ public class DefaultLitePullConsumerTest {
return litePullConsumer;
}
+ private DefaultLitePullConsumer
createSubscribeLitePullConsumerWithListener() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ litePullConsumer.subscribe(topic, "*", new MessageQueueListener() {
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue>
mqAll, Set<MessageQueue> mqDivided) {
+
assertThat(mqAll.stream().findFirst().get().getTopic()).isEqualTo(mqDivided.stream().findFirst().get().getTopic());
+
assertThat(mqAll.stream().findFirst().get().getBrokerName()).isEqualTo(mqDivided.stream().findFirst().get().getBrokerName());
+
assertThat(mqAll.stream().findFirst().get().getQueueId()).isEqualTo(mqDivided.stream().findFirst().get().getQueueId());
+ }
+ });
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
+ }
+
private DefaultLitePullConsumer createStartLitePullConsumer() throws
Exception {
DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -743,6 +814,7 @@ public class DefaultLitePullConsumerTest {
return litePullConsumer;
}
+
private DefaultLitePullConsumer createStartLitePullConsumerWithTag()
throws Exception {
DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");