This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 4.9.x in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit d98a8fad9e0f763ba60fe5f71611a50adb770ae2 Author: lizhiboo <[email protected]> AuthorDate: Mon Aug 15 18:02:57 2022 +0800 [ISSUE #4776]Support tag in litepullconsumer assign mode (#4775) * support tag for litepullconsumer at assign type * support tag for litepullconsumer at assign mode * with namespace * fix typo * add test case * fix * code style (cherry picked from commit dd87d90b29993cbfd67ee5d80279f5483b35d130) --- .../client/consumer/DefaultLitePullConsumer.java | 5 ++ .../rocketmq/client/consumer/LitePullConsumer.java | 9 +++ .../impl/consumer/DefaultLitePullConsumerImpl.java | 18 ++++- .../consumer/DefaultLitePullConsumerTest.java | 84 ++++++++++++++++++++++ .../LitePullConsumerAssignWithSubExpression.java | 60 ++++++++++++++++ 5 files changed, 175 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 7799166f2..6b8d1b4ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -271,6 +271,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); } + @Override + public void setSubExpressionForAssign(final String topic, final String subExpresion) { + defaultLitePullConsumerImpl.setSubExpressionForAssign(withNamespace(topic), subExpresion); + } + @Override public List<MessageExt> poll() { return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java index 089df516f..8bca31c78 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -75,6 +75,15 @@ public interface LitePullConsumer { */ void assign(Collection<MessageQueue> messageQueues); + /** + * Set topic subExpression for assign mode. This interface does not allow be call after start(). Default value is * if not set. + * assignment and will replace the previous assignment (if there is one). + * + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if + * * null or * expression,meaning subscribe all + */ + void setSubExpressionForAssign(final String topic, final String subExpression); + /** * Fetch data for the topics or partitions specified using assign API * diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index a4eba4662..f3cd7d5b8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; @@ -125,6 +126,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000; + private ConcurrentHashMap<String/* topic */, String/* subExpression */> topicToSubExpression = new ConcurrentHashMap<String, String>(); + private DefaultLitePullConsumer defaultLitePullConsumer; private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = @@ -529,6 +532,17 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + public synchronized void setSubExpressionForAssign(final String topic, final String subExpression) { + if (StringUtils.isBlank(subExpression)) { + throw new IllegalArgumentException("subExpression can not be null or empty."); + } + if (serviceState != ServiceState.CREATE_JUST) { + throw new IllegalStateException("setAssignTag only can be called before start."); + } + setSubscriptionType(SubscriptionType.ASSIGN); + topicToSubExpression.put(topic, subExpression); + } + private void maybeAutoCommit() { long now = System.currentTimeMillis(); if (now >= nextAutoCommitDeadline) { @@ -848,7 +862,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { if (subscriptionType == SubscriptionType.SUBSCRIBE) { subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); } else { - subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); + String subExpression4Assign = topicToSubExpression.get(topic); + subExpression4Assign = subExpression4Assign == null ? SubscriptionData.SUB_ALL : subExpression4Assign; + subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign); } PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index c452f30ff..0f0327f0f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -125,6 +125,21 @@ public class DefaultLitePullConsumerTest { } } + @Test + public void testAssign_PollMessageWithTagSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumerWithTag(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + List<MessageExt> result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getTags()).isEqualTo("tagA"); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + @Test public void testSubscribe_PollMessageSuccess() throws Exception { DefaultLitePullConsumer litePullConsumer = createSubscribeLitePullConsumer(); @@ -640,6 +655,65 @@ public class DefaultLitePullConsumerTest { doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); } + private void initDefaultLitePullConsumerWithTag(DefaultLitePullConsumer litePullConsumer) throws Exception { + + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(litePullConsumerImpl, mQClientFactory); + + PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper(); + field = PullAPIWrapper.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pullAPIWrapper, mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + field = MQClientInstance.class.getDeclaredField("mQAdminImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQAdminImpl); + + field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl); + field = RebalanceImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(rebalanceImpl, mQClientFactory); + + offsetStore = spy(litePullConsumerImpl.getOffsetStore()); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore"); + field.setAccessible(true); + field.set(litePullConsumerImpl, offsetStore); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer<PullResult>() { + @Override + public PullResult answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setTags("tagA"); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + return pullResult; + } + }); + + when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + + doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); + } + private DefaultLitePullConsumer createSubscribeLitePullConsumer() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); @@ -659,6 +733,16 @@ public class DefaultLitePullConsumerTest { return litePullConsumer; } + private DefaultLitePullConsumer createStartLitePullConsumerWithTag() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); + litePullConsumer.setSubExpressionForAssign(topic, "tagA"); + litePullConsumer.start(); + initDefaultLitePullConsumerWithTag(litePullConsumer); + return litePullConsumer; + } + private DefaultLitePullConsumer createNotStartLitePullConsumer() { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); return litePullConsumer; diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java new file mode 100644 index 000000000..0ab106fa1 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class LitePullConsumerAssignWithSubExpression { + + public static volatile boolean running = true; + + public static void main(String[] args) throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); + litePullConsumer.setAutoCommit(false); + litePullConsumer.setSubExpressionForAssign("TopicTest", "TagA"); + litePullConsumer.start(); + Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); + List<MessageQueue> list = new ArrayList<>(mqSet); + List<MessageQueue> assignList = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + assignList.add(list.get(i)); + } + mqSet = litePullConsumer.fetchMessageQueues("TopicTest1"); + list = new ArrayList<>(mqSet); + for (int i = 0; i < list.size(); i++) { + assignList.add(list.get(i)); + } + litePullConsumer.assign(assignList); + litePullConsumer.seek(assignList.get(0), 10); + try { + while (running) { + List<MessageExt> messageExts = litePullConsumer.poll(); + System.out.printf("%s %n", messageExts); + litePullConsumer.commitSync(); + } + } finally { + litePullConsumer.shutdown(); + } + + } +}
