This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5be91f  [ISSUE #3789] optimize: Tag the name of  consuming thread 
whith consumeGroup. (#3795)
d5be91f is described below

commit d5be91fa00136d8c2df37d83ea94dab37d630939
Author: 彭小漪 <[email protected]>
AuthorDate: Thu Jan 27 14:58:09 2022 +0800

    [ISSUE #3789] optimize: Tag the name of  consuming thread whith 
consumeGroup. (#3795)
    
    * It is useful for debug.
---
 .../ConsumeMessageConcurrentlyService.java         |   8 +-
 .../consumer/ConsumeMessageOrderlyService.java     |   8 +-
 .../ConsumeMessageConcurrentlyServiceTest.java     |  32 ++++
 .../consumer/ConsumeMessageOrderlyServiceTest.java | 177 +++++++++++++++++++++
 4 files changed, 223 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 537dbee..384f3f1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -70,13 +70,19 @@ public class ConsumeMessageConcurrentlyService implements 
ConsumeMessageService
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
+        String consumeThreadPrefix = null;
+        if (consumerGroup.length() > 100) {
+            consumeThreadPrefix = new 
StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 
100)).append("_").toString();
+        } else {
+            consumeThreadPrefix = new 
StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+        }
         this.consumeExecutor = new ThreadPoolExecutor(
             this.defaultMQPushConsumer.getConsumeThreadMin(),
             this.defaultMQPushConsumer.getConsumeThreadMax(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.consumeRequestQueue,
-            new ThreadFactoryImpl("ConsumeMessageThread_"));
+            new ThreadFactoryImpl(consumeThreadPrefix));
 
         this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
         this.cleanExpireMsgExecutors = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 8d92b57..812e8ab 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -75,13 +75,19 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
+        String consumeThreadPrefix = null;
+        if (consumerGroup.length() > 100) {
+            consumeThreadPrefix = new 
StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 
100)).append("_").toString();
+        } else {
+            consumeThreadPrefix = new 
StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+        }
         this.consumeExecutor = new ThreadPoolExecutor(
             this.defaultMQPushConsumer.getConsumeThreadMin(),
             this.defaultMQPushConsumer.getConsumeThreadMax(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.consumeRequestQueue,
-            new ThreadFactoryImpl("ConsumeMessageThread_"));
+            new ThreadFactoryImpl(consumeThreadPrefix));
 
         this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
     }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 7badc3b..c12f2fc 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -231,4 +231,36 @@ public class ConsumeMessageConcurrentlyServiceTest {
         }
         return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, 
outputStream.toByteArray());
     }
+
+    @Test
+    public void testConsumeThreadName() throws Exception {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final AtomicReference<String> consumeThreadName = new 
AtomicReference<String>();
+
+        StringBuilder consumeGroup2 = new StringBuilder();
+        for (int i = 0; i < 101; i++) {
+            consumeGroup2.append(i).append("#");
+        }
+        pushConsumer.setConsumerGroup(consumeGroup2.toString());
+        ConsumeMessageConcurrentlyService  normalServie = new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
+                                                            
ConsumeConcurrentlyContext context) {
+                consumeThreadName.set(Thread.currentThread().getName());
+                countDownLatch.countDown();
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
+
+        PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await();
+        System.out.println(consumeThreadName.get());
+        if (consumeGroup2.length() <= 100) {
+            
assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + 
consumeGroup2 + "_");
+        } else {
+            
assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + 
consumeGroup2.substring(0, 100) + "_");
+        }
+    }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
index 4cfa011..8ea1727 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
@@ -16,30 +16,146 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class ConsumeMessageOrderlyServiceTest {
     private String consumerGroup;
     private String topic = "FooBar";
     private String brokerName = "BrokerA";
     private DefaultMQPushConsumer pushConsumer;
+    private MQClientInstance mQClientFactory;
 
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private PullAPIWrapper pullAPIWrapper;
+    private RebalancePushImpl rebalancePushImpl;
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = 
(ConcurrentMap<String, MQClientInstance>) 
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", 
true);
+        Collection<MQClientInstance> instances = factoryTable.values();
+        for (MQClientInstance instance : instances) {
+            instance.shutdown();
+        }
+        factoryTable.clear();
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+
+        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pushConsumer.setPullInterval(60 * 1000);
+
+        pushConsumer.registerMessageListener(new MessageListenerOrderly() {
+
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeOrderlyContext context) {
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        });
+
+
+        DefaultMQPushConsumerImpl pushConsumerImpl = 
pushConsumer.getDefaultMQPushConsumerImpl();
+        rebalancePushImpl = spy(new 
RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+        Field field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, rebalancePushImpl);
+        pushConsumer.subscribe(topic, "*");
+
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, 
(RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
+        mQClientFactory = spy(mQClientFactory);
+        field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, mQClientFactory);
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
+        field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, pullAPIWrapper);
+
+        
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
+                anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+                .thenAnswer(new Answer<PullResult>() {
+                    @Override
+                    public PullResult answer(InvocationOnMock mock) throws 
Throwable {
+                        PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
+                        MessageClientExt messageClientExt = new 
MessageClientExt();
+                        messageClientExt.setTopic(topic);
+                        messageClientExt.setQueueId(0);
+                        messageClientExt.setMsgId("123");
+                        messageClientExt.setBody(new byte[] {'a'});
+                        messageClientExt.setOffsetMsgId("234");
+                        messageClientExt.setBornHost(new 
InetSocketAddress(8080));
+                        messageClientExt.setStoreHost(new 
InetSocketAddress(8080));
+                        PullResult pullResult = 
createPullResult(requestHeader, PullStatus.FOUND, 
Collections.<MessageExt>singletonList(messageClientExt));
+                        ((PullCallback) 
mock.getArgument(4)).onSuccess(pullResult);
+                        return pullResult;
+                    }
+                });
+
+        doReturn(new FindBrokerResult("127.0.0.1:10912", 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean());
+        
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest().getMessageQueue());
+        
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, 
messageQueueSet);
+        pushConsumer.start();
     }
 
     @Test
@@ -83,4 +199,65 @@ public class ConsumeMessageOrderlyServiceTest {
         assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, 
brokerName).getConsumeResult().equals(CMResult.CR_THROW_EXCEPTION));
     }
 
+    @Test
+    public void testConsumeThreadName() throws Exception {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final AtomicReference<String> consumeThreadName = new 
AtomicReference<String>();
+
+        StringBuilder consumeGroup2 = new StringBuilder();
+        for (int i = 0; i < 101; i++) {
+            consumeGroup2.append(i).append("#");
+        }
+        pushConsumer.setConsumerGroup(consumeGroup2.toString());
+
+        MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeOrderlyContext context) {
+                consumeThreadName.set(Thread.currentThread().getName());
+                countDownLatch.countDown();
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        };
+        ConsumeMessageOrderlyService consumeMessageOrderlyService = new 
ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
listenerOrderly);
+        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(consumeMessageOrderlyService);
+
+
+        PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await();
+        System.out.println(consumeThreadName.get());
+        if (consumeGroup2.length() <= 100) {
+            
assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + 
consumeGroup2 + "_");
+        } else {
+            
assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + 
consumeGroup2.substring(0, 100) + "_");
+        }
+    }
+
+    private PullRequest createPullRequest() {
+        PullRequest pullRequest = new PullRequest();
+        pullRequest.setConsumerGroup(consumerGroup);
+        pullRequest.setNextOffset(1024);
+
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        pullRequest.setMessageQueue(messageQueue);
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueue.setLocked(true);
+        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+        pullRequest.setProcessQueue(processQueue);
+
+        return pullRequest;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus,
+                                           List<MessageExt> messageExtList) 
throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, 
outputStream.toByteArray());
+    }
+
 }

Reply via email to