This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d98a8fad9e0f763ba60fe5f71611a50adb770ae2
Author: lizhiboo <[email protected]>
AuthorDate: Mon Aug 15 18:02:57 2022 +0800

    [ISSUE #4776]Support tag in litepullconsumer assign mode (#4775)
    
    * support tag for litepullconsumer at assign type
    
    * support tag for litepullconsumer at assign mode
    
    * with namespace
    
    * fix typo
    
    * add test case
    
    * fix
    
    * code style
    
    (cherry picked from commit dd87d90b29993cbfd67ee5d80279f5483b35d130)
---
 .../client/consumer/DefaultLitePullConsumer.java   |  5 ++
 .../rocketmq/client/consumer/LitePullConsumer.java |  9 +++
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 18 ++++-
 .../consumer/DefaultLitePullConsumerTest.java      | 84 ++++++++++++++++++++++
 .../LitePullConsumerAssignWithSubExpression.java   | 60 ++++++++++++++++
 5 files changed, 175 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 7799166f2..6b8d1b4ae 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -271,6 +271,11 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
     }
 
+    @Override
+    public void setSubExpressionForAssign(final String topic, final String 
subExpresion) {
+        
defaultLitePullConsumerImpl.setSubExpressionForAssign(withNamespace(topic), 
subExpresion);
+    }
+
     @Override
     public List<MessageExt> poll() {
         return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 089df516f..8bca31c78 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -75,6 +75,15 @@ public interface LitePullConsumer {
      */
     void assign(Collection<MessageQueue> messageQueues);
 
+    /**
+     * Set topic subExpression for assign mode. This interface does not allow 
be call after start(). Default value is * if not set.
+     * assignment and will replace the previous assignment (if there is one).
+     *
+     * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br> if
+     *      * null or * expression,meaning subscribe all
+     */
+    void setSubExpressionForAssign(final String topic, final String 
subExpression);
+
     /**
      * Fetch data for the topics or partitions specified using assign API
      *
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a4eba4662..f3cd7d5b8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
@@ -125,6 +126,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
 
+    private ConcurrentHashMap<String/* topic */, String/* subExpression */> 
topicToSubExpression = new ConcurrentHashMap<String, String>();
+
     private DefaultLitePullConsumer defaultLitePullConsumer;
 
     private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
@@ -529,6 +532,17 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    public synchronized void setSubExpressionForAssign(final String topic, 
final String subExpression) {
+        if (StringUtils.isBlank(subExpression)) {
+            throw new IllegalArgumentException("subExpression can not be null 
or empty.");
+        }
+        if (serviceState != ServiceState.CREATE_JUST) {
+            throw new IllegalStateException("setAssignTag only can be called 
before start.");
+        }
+        setSubscriptionType(SubscriptionType.ASSIGN);
+        topicToSubExpression.put(topic, subExpression);
+    }
+
     private void maybeAutoCommit() {
         long now = System.currentTimeMillis();
         if (now >= nextAutoCommitDeadline) {
@@ -848,7 +862,9 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                         subscriptionData = 
rebalanceImpl.getSubscriptionInner().get(topic);
                     } else {
-                        subscriptionData = 
FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
+                        String subExpression4Assign = 
topicToSubExpression.get(topic);
+                        subExpression4Assign = subExpression4Assign == null ? 
SubscriptionData.SUB_ALL : subExpression4Assign;
+                        subscriptionData = 
FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
                     }
 
                     PullResult pullResult = pull(messageQueue, 
subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index c452f30ff..0f0327f0f 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -125,6 +125,21 @@ public class DefaultLitePullConsumerTest {
         }
     }
 
+    @Test
+    public void testAssign_PollMessageWithTagSuccess() throws Exception {
+        DefaultLitePullConsumer litePullConsumer = 
createStartLitePullConsumerWithTag();
+        try {
+            MessageQueue messageQueue = createMessageQueue();
+            litePullConsumer.assign(Collections.singletonList(messageQueue));
+            List<MessageExt> result = litePullConsumer.poll();
+            assertThat(result.get(0).getTopic()).isEqualTo(topic);
+            assertThat(result.get(0).getTags()).isEqualTo("tagA");
+            assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+        } finally {
+            litePullConsumer.shutdown();
+        }
+    }
+
     @Test
     public void testSubscribe_PollMessageSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createSubscribeLitePullConsumer();
@@ -640,6 +655,65 @@ public class DefaultLitePullConsumerTest {
         doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class));
     }
 
+    private void initDefaultLitePullConsumerWithTag(DefaultLitePullConsumer 
litePullConsumer) throws Exception {
+
+        Field field = 
DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+        field.setAccessible(true);
+        litePullConsumerImpl = (DefaultLitePullConsumerImpl) 
field.get(litePullConsumer);
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, mQClientFactory);
+
+        PullAPIWrapper pullAPIWrapper = 
litePullConsumerImpl.getPullAPIWrapper();
+        field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pullAPIWrapper, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQAdminImpl);
+
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
+        field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(rebalanceImpl, mQClientFactory);
+
+        offsetStore = spy(litePullConsumerImpl.getOffsetStore());
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, offsetStore);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
+                anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+                .thenAnswer(new Answer<PullResult>() {
+                    @Override
+                    public PullResult answer(InvocationOnMock mock) throws 
Throwable {
+                        PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
+                        MessageClientExt messageClientExt = new 
MessageClientExt();
+                        messageClientExt.setTopic(topic);
+                        messageClientExt.setTags("tagA");
+                        messageClientExt.setQueueId(0);
+                        messageClientExt.setMsgId("123");
+                        messageClientExt.setBody(new byte[] {'a'});
+                        messageClientExt.setOffsetMsgId("234");
+                        messageClientExt.setBornHost(new 
InetSocketAddress(8080));
+                        messageClientExt.setStoreHost(new 
InetSocketAddress(8080));
+                        PullResult pullResult = 
createPullResult(requestHeader, PullStatus.FOUND, 
Collections.singletonList(messageClientExt));
+                        return pullResult;
+                    }
+                });
+
+        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", 
false));
+        
+        doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class));
+    }
+
     private DefaultLitePullConsumer createSubscribeLitePullConsumer() throws 
Exception {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -659,6 +733,16 @@ public class DefaultLitePullConsumerTest {
         return litePullConsumer;
     }
 
+    private DefaultLitePullConsumer createStartLitePullConsumerWithTag() 
throws Exception {
+        DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
+        litePullConsumer.setSubExpressionForAssign(topic, "tagA");
+        litePullConsumer.start();
+        initDefaultLitePullConsumerWithTag(litePullConsumer);
+        return litePullConsumer;
+    }
+
     private DefaultLitePullConsumer createNotStartLitePullConsumer() {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         return litePullConsumer;
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
new file mode 100644
index 000000000..0ab106fa1
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class LitePullConsumerAssignWithSubExpression {
+
+    public static volatile boolean running = true;
+
+    public static void main(String[] args) throws Exception {
+        DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer("please_rename_unique_group_name");
+        litePullConsumer.setAutoCommit(false);
+        litePullConsumer.setSubExpressionForAssign("TopicTest", "TagA");
+        litePullConsumer.start();
+        Collection<MessageQueue> mqSet = 
litePullConsumer.fetchMessageQueues("TopicTest");
+        List<MessageQueue> list = new ArrayList<>(mqSet);
+        List<MessageQueue> assignList = new ArrayList<>();
+        for (int i = 0; i < list.size(); i++) {
+            assignList.add(list.get(i));
+        }
+        mqSet = litePullConsumer.fetchMessageQueues("TopicTest1");
+        list = new ArrayList<>(mqSet);
+        for (int i = 0; i < list.size(); i++) {
+            assignList.add(list.get(i));
+        }
+        litePullConsumer.assign(assignList);
+        litePullConsumer.seek(assignList.get(0), 10);
+        try {
+            while (running) {
+                List<MessageExt> messageExts = litePullConsumer.poll();
+                System.out.printf("%s %n", messageExts);
+                litePullConsumer.commitSync();
+            }
+        } finally {
+            litePullConsumer.shutdown();
+        }
+
+    }
+}

Reply via email to