This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a1d2a7325 [ISSUE #4805]In the subscribe mode, user-defined 
MessageQueueListener is supported. At the same time, you can specify the offset 
that the MessageQueue will commit (#4820)
a1d2a7325 is described below

commit a1d2a7325505d572a80f4fe18928a1997a3abbcc
Author: xiaoyi <[email protected]>
AuthorDate: Thu Aug 25 09:08:23 2022 +0800

    [ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is 
supported. At the same time, you can specify the offset that the MessageQueue 
will commit (#4820)
---
 .../client/consumer/DefaultLitePullConsumer.java   |  36 ++++++-
 .../rocketmq/client/consumer/LitePullConsumer.java |  23 +++++
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 107 ++++++++++++++++++---
 .../consumer/DefaultLitePullConsumerTest.java      |  72 ++++++++++++++
 4 files changed, 223 insertions(+), 15 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6b8d1b4ae..76acd6338 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -265,7 +266,6 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     public void unsubscribe(String topic) {
         this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
     }
-
     @Override
     public void assign(Collection<MessageQueue> messageQueues) {
         defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -322,6 +322,40 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         this.defaultLitePullConsumerImpl.commitAll();
     }
 
+    /**
+     * Offset specified by batch commit
+     * @param offsetMap
+     * @param persist
+     */
+    @Override
+    public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist) 
{
+        this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
+    }
+
+    /**
+     * Get the MessageQueue assigned in subscribe mode
+     *
+     * @return
+     * @throws MQClientException
+     */
+    @Override
+    public Set<MessageQueue> assignment() throws MQClientException {
+        return this.defaultLitePullConsumerImpl.assignment();
+    }
+
+    /**
+     * Subscribe some topic with subExpression and messageQueueListener
+     *
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     */
+    @Override
+    public void subscribe(String topic, String subExpression, 
MessageQueueListener messageQueueListener) throws MQClientException {
+        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), 
subExpression, messageQueueListener);
+    }
+
+
     @Override
     public void commit(final Set<MessageQueue> messageQueues, boolean persist) 
{
         this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 8bca31c78..e9e67d055 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public interface LitePullConsumer {
@@ -52,6 +53,14 @@ public interface LitePullConsumer {
      */
     void subscribe(final String topic, final String subExpression) throws 
MQClientException;
 
+    /**
+     * Subscribe some topic with subExpression and messageQueueListener
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     */
+    void subscribe(final String topic, final String subExpression, final 
MessageQueueListener messageQueueListener) throws MQClientException;
+
     /**
      * Subscribe some topic with selector.
      *
@@ -67,6 +76,14 @@ public interface LitePullConsumer {
      */
     void unsubscribe(final String topic);
 
+
+    /**
+     * subscribe mode, get assigned MessageQueue
+     * @return
+     * @throws MQClientException
+     */
+    Set<MessageQueue> assignment() throws MQClientException;
+
     /**
      * Manually assign a list of message queues to this consumer. This 
interface does not allow for incremental
      * assignment and will replace the previous assignment (if there is one).
@@ -170,6 +187,12 @@ public interface LitePullConsumer {
      */
     void commitSync();
 
+    /**
+     * Offset specified by batch commit
+     * @param offsetMap
+     * @param persist
+     */
+    void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist);
 
     void commit(final Set<MessageQueue> messageQueues, boolean persist);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c720ef075..1e40ddaf6 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -240,19 +240,23 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     class MessageQueueListenerImpl implements MessageQueueListener {
         @Override
         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
-            MessageModel messageModel = 
defaultLitePullConsumer.getMessageModel();
-            switch (messageModel) {
-                case BROADCASTING:
-                    updateAssignedMessageQueue(topic, mqAll);
-                    updatePullTask(topic, mqAll);
-                    break;
-                case CLUSTERING:
-                    updateAssignedMessageQueue(topic, mqDivided);
-                    updatePullTask(topic, mqDivided);
-                    break;
-                default:
-                    break;
-            }
+            updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+        }
+    }
+
+    public void updateAssignQueueAndStartPullTask(String topic, 
Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+        switch (messageModel) {
+            case BROADCASTING:
+                updateAssignedMessageQueue(topic, mqAll);
+                updatePullTask(topic, mqAll);
+                break;
+            case CLUSTERING:
+                updateAssignedMessageQueue(topic, mqDivided);
+                updatePullTask(topic, mqDivided);
+                break;
+            default:
+                break;
         }
     }
 
@@ -474,6 +478,41 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    /**
+     * subscribe data by customizing messageQueueListener
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     * @throws MQClientException
+     */
+    public synchronized void subscribe(String topic, String subExpression, 
MessageQueueListener messageQueueListener) throws MQClientException {
+        try {
+            if (StringUtils.isEmpty(topic)) {
+                throw new IllegalArgumentException("Topic can not be null or 
empty.");
+            }
+            setSubscriptionType(SubscriptionType.SUBSCRIBE);
+            SubscriptionData subscriptionData = 
FilterAPI.buildSubscriptionData(topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, 
subscriptionData);
+            this.defaultLitePullConsumer.setMessageQueueListener(new 
MessageQueueListener() {
+                @Override
+                public void messageQueueChanged(String topic, 
Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+                    // First, update the assign queue
+                    updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+                    // run custom listener
+                    messageQueueListener.messageQueueChanged(topic, mqAll, 
mqDivided);
+                }
+            });
+            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+            if (serviceState == ServiceState.RUNNING) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscribe exception", e);
+        }
+    }
+
+
     public synchronized void subscribe(String topic, String subExpression) 
throws MQClientException {
         try {
             if (topic == null || "".equals(topic)) {
@@ -687,6 +726,42 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    /**
+     * Specify offset commit
+     * @param messageQueues
+     * @param persist
+     */
+    public synchronized void commit(final Map<MessageQueue, Long> 
messageQueues, boolean persist) {
+        if (messageQueues == null || messageQueues.size() == 0) {
+            log.warn("MessageQueues is empty, Ignore this commit ");
+            return;
+        }
+        for (Map.Entry<MessageQueue, Long> messageQueueEntry : 
messageQueues.entrySet()) {
+            MessageQueue messageQueue = messageQueueEntry.getKey();
+            long offset = messageQueueEntry.getValue();
+            if (offset != -1) {
+                ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
+                if (processQueue != null && !processQueue.isDropped()) {
+                    updateConsumeOffset(messageQueue, offset);
+                }
+            } else {
+                log.error("consumerOffset is -1 in messageQueue [" + 
messageQueue + "].");
+            }
+        }
+
+        if (persist) {
+            this.offsetStore.persistAll(messageQueues.keySet());
+        }
+    }
+
+    /**
+     * Get the queue assigned in subscribe mode
+     * @return
+     */
+    public synchronized Set<MessageQueue> assignment() {
+        return assignedMessageQueue.getAssignedMessageQueues();
+    }
+
     public synchronized void commit(final Set<MessageQueue> messageQueues, 
boolean persist) {
         if (messageQueues == null || messageQueues.size() == 0) {
             return;
@@ -1147,8 +1222,12 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         return isEqual;
     }
 
+    public AssignedMessageQueue getAssignedMessageQueue() {
+        return assignedMessageQueue;
+    }
+
     public synchronized void registerTopicMessageQueueChangeListener(String 
topic,
-        TopicMessageQueueChangeListener listener) throws MQClientException {
+                                                                     
TopicMessageQueueChangeListener listener) throws MQClientException {
         if (topic == null || listener == null) {
             throw new MQClientException("Topic or listener is null", null);
         }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 128e4cd8a..1db13f89c 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -21,9 +21,12 @@ import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
@@ -131,6 +134,26 @@ public class DefaultLitePullConsumerTest {
         }
     }
 
+    @Test
+    public void testSubscribeWithListener_PollMessageSuccess() throws 
Exception {
+        DefaultLitePullConsumer litePullConsumer = 
createSubscribeLitePullConsumerWithListener();
+        try {
+            Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+            messageQueueSet.add(createMessageQueue());
+            litePullConsumerImpl.updateTopicSubscribeInfo(topic, 
messageQueueSet);
+            litePullConsumer.setPollTimeoutMillis(20 * 1000);
+            List<MessageExt> result = litePullConsumer.poll();
+            assertThat(result.get(0).getTopic()).isEqualTo(topic);
+            assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+
+            Set<MessageQueue> assignment= litePullConsumer.assignment();
+            
assertThat(assignment.stream().findFirst().get()).isEqualTo(messageQueueSet.stream().findFirst().get());
+        } finally {
+            litePullConsumer.shutdown();
+        }
+    }
+
+  
     @Test
     public void testAssign_PollMessageWithTagSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createStartLitePullConsumerWithTag();
@@ -146,6 +169,36 @@ public class DefaultLitePullConsumerTest {
         }
     }
 
+    @Test
+    public void testConsumerCommitSyncWithMQOffset() throws Exception {
+        DefaultLitePullConsumer litePullConsumer = 
createNotStartLitePullConsumer();
+        RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+        litePullConsumer.setOffsetStore(store);
+        litePullConsumer.start();
+        initDefaultLitePullConsumer(litePullConsumer);
+
+        //replace with real offsetStore.
+        Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+        offsetStore.setAccessible(true);
+        offsetStore.set(litePullConsumerImpl, store);
+
+        MessageQueue messageQueue = createMessageQueue();
+        HashSet<MessageQueue> set = new HashSet<MessageQueue>();
+        set.add(messageQueue);
+
+        //mock assign and reset offset
+        litePullConsumer.assign(set);
+        litePullConsumer.seek(messageQueue, 0);
+
+        //commit offset 1
+        Map<MessageQueue, Long> commitOffset = new HashMap<>();
+        commitOffset.put(messageQueue, 1L);
+        litePullConsumer.commitSync(commitOffset, true);
+
+        assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+    }
+
+
     @Test
     public void testSubscribe_PollMessageSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createSubscribeLitePullConsumer();
@@ -592,6 +645,7 @@ public class DefaultLitePullConsumerTest {
         assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
     }
 
+
     static class AsyncConsumer {
         public void executeAsync(final DefaultLitePullConsumer consumer) {
             new Thread(new Runnable() {
@@ -734,6 +788,23 @@ public class DefaultLitePullConsumerTest {
         return litePullConsumer;
     }
 
+    private DefaultLitePullConsumer 
createSubscribeLitePullConsumerWithListener() throws Exception {
+        DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        litePullConsumer.subscribe(topic, "*", new MessageQueueListener() {
+            @Override
+            public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
+                
assertThat(mqAll.stream().findFirst().get().getTopic()).isEqualTo(mqDivided.stream().findFirst().get().getTopic());
+                
assertThat(mqAll.stream().findFirst().get().getBrokerName()).isEqualTo(mqDivided.stream().findFirst().get().getBrokerName());
+                
assertThat(mqAll.stream().findFirst().get().getQueueId()).isEqualTo(mqDivided.stream().findFirst().get().getQueueId());
+            }
+        });
+        suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
+        litePullConsumer.start();
+        initDefaultLitePullConsumer(litePullConsumer);
+        return litePullConsumer;
+    }
+
     private DefaultLitePullConsumer createStartLitePullConsumer() throws 
Exception {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -743,6 +814,7 @@ public class DefaultLitePullConsumerTest {
         return litePullConsumer;
     }
 
+          
     private DefaultLitePullConsumer createStartLitePullConsumerWithTag() 
throws Exception {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");

Reply via email to