This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 35ac588fb [ISSUE #4979] Fix flaky integration test 
NormalMessageSendAndRecvIT (#4980)
35ac588fb is described below

commit 35ac588fb55ebf6ac0d72e7c7ad8272898d1df0f
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Sep 5 14:27:01 2022 +0800

    [ISSUE #4979] Fix flaky integration test NormalMessageSendAndRecvIT (#4980)
    
    * Fix netty warning; Fix flaky integration test NormalMessageSendAndRecvIT; 
Fix naming typo
    
    * Change log level
    
    * Use awaitability to make everyone happy
    
    * Do not submit tasks when executor has been shut down
---
 .../broker/processor/PopReviveService.java         | 16 +++++----
 ...ervice.java => BatchUnregistrationService.java} | 26 +++++++-------
 .../namesrv/routeinfo/RouteInfoManager.java        |  4 +--
 .../remoting/netty/NettyRemotingAbstract.java      |  2 +-
 .../remoting/netty/NettyRemotingServer.java        |  2 +-
 .../test/smoke/NormalMessageSendAndRecvIT.java     | 42 ++++++++++++++++------
 6 files changed, 58 insertions(+), 34 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 1a6c52ec3..8708b48a6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -198,12 +198,11 @@ public class PopReviveService extends ServiceThread {
                     break;
                 case NO_MATCHED_MESSAGE:
                     pullStatus = PullStatus.NO_MATCHED_MSG;
-                    POP_LOGGER.warn("no matched message. GetMessageStatus={}, 
topic={}, groupId={}, requestOffset={}",
+                    POP_LOGGER.debug("no matched message. GetMessageStatus={}, 
topic={}, groupId={}, requestOffset={}",
                         getMessageResult.getStatus(), topic, group, offset);
                     break;
                 case NO_MESSAGE_IN_QUEUE:
-                    pullStatus = PullStatus.NO_NEW_MSG;
-                    POP_LOGGER.warn("no new message. GetMessageStatus={}, 
topic={}, groupId={}, requestOffset={}",
+                    POP_LOGGER.debug("no new message. GetMessageStatus={}, 
topic={}, groupId={}, requestOffset={}",
                         getMessageResult.getStatus(), topic, group, offset);
                     break;
                 case MESSAGE_WAS_REMOVING:
@@ -225,7 +224,11 @@ public class PopReviveService extends ServiceThread {
                 getMessageResult.getMaxOffset(), foundList);
 
         } else {
-            POP_LOGGER.error("get message from store return null. topic={}, 
groupId={}, requestOffset={}", topic, group, offset);
+            long maxQueueOffset = 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+            if (maxQueueOffset > offset) {
+                POP_LOGGER.error("get message from store return null. 
topic={}, groupId={}, requestOffset={}, maxQueueOffset={}",
+                    topic, group, offset, maxQueueOffset);
+            }
             return null;
         }
     }
@@ -289,11 +292,12 @@ public class PopReviveService extends ServiceThread {
                     break;
                 }
                 noMsgCount++;
+                // Fixme: why sleep is useful here?
                 try {
                     Thread.sleep(100);
-                } catch (Throwable e) {
+                } catch (Throwable ignore) {
                 }
-                if (noMsgCount * 100 > 4 * PopAckConstants.SECOND) {
+                if (noMsgCount * 100L > 4 * PopAckConstants.SECOND) {
                     break;
                 } else {
                     continue;
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
similarity index 70%
rename from 
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
rename to 
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
index 9e84272cb..5a4def305 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
@@ -29,17 +29,17 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
- * BatchUnRegisterProcessor provides a mechanism to unregister broker in batch 
manner, which can speed up broker offline
+ * BatchUnregistrationService provides a mechanism to unregister brokers in 
batch manner, which speeds up broker-offline
  * process.
  */
-public class BatchUnRegisterService extends ServiceThread {
+public class BatchUnregistrationService extends ServiceThread {
     private final RouteInfoManager routeInfoManager;
-    private BlockingQueue<UnRegisterBrokerRequestHeader> unRegisterQueue;
+    private BlockingQueue<UnRegisterBrokerRequestHeader> unregistrationQueue;
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
-    public BatchUnRegisterService(RouteInfoManager routeInfoManager, 
NamesrvConfig namesrvConfig) {
+    public BatchUnregistrationService(RouteInfoManager routeInfoManager, 
NamesrvConfig namesrvConfig) {
         this.routeInfoManager = routeInfoManager;
-        this.unRegisterQueue = new 
LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
+        this.unregistrationQueue = new 
LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
     }
 
     /**
@@ -49,26 +49,26 @@ public class BatchUnRegisterService extends ServiceThread {
      * @return {@code true} if the request was added to this queue, else 
{@code false}
      */
     public boolean submit(UnRegisterBrokerRequestHeader unRegisterRequest) {
-        return unRegisterQueue.offer(unRegisterRequest);
+        return unregistrationQueue.offer(unRegisterRequest);
     }
 
     @Override
     public String getServiceName() {
-        return BatchUnRegisterService.class.getName();
+        return BatchUnregistrationService.class.getName();
     }
 
     @Override
     public void run() {
         while (!this.isStopped()) {
             try {
-                final UnRegisterBrokerRequestHeader request = 
unRegisterQueue.take();
-                Set<UnRegisterBrokerRequestHeader> unRegisterRequests = new 
HashSet<>();
-                unRegisterQueue.drainTo(unRegisterRequests);
+                final UnRegisterBrokerRequestHeader request = 
unregistrationQueue.take();
+                Set<UnRegisterBrokerRequestHeader> unregistrationRequests = 
new HashSet<>();
+                unregistrationQueue.drainTo(unregistrationRequests);
 
                 // Add polled request
-                unRegisterRequests.add(request);
+                unregistrationRequests.add(request);
 
-                this.routeInfoManager.unRegisterBroker(unRegisterRequests);
+                this.routeInfoManager.unRegisterBroker(unregistrationRequests);
             } catch (Throwable e) {
                 log.error("Handle unregister broker request failed", e);
             }
@@ -77,6 +77,6 @@ public class BatchUnRegisterService extends ServiceThread {
 
     // For test only
     int queueLength() {
-        return this.unRegisterQueue.size();
+        return this.unregistrationQueue.size();
     }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 3facb3cf8..ace963ca9 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -76,7 +76,7 @@ public class RouteInfoManager {
     private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter 
Server */> filterServerTable;
     private final Map<String/* topic */, Map<String/*brokerName*/, 
TopicQueueMappingInfo>> topicQueueMappingInfoTable;
 
-    private final BatchUnRegisterService unRegisterService;
+    private final BatchUnregistrationService unRegisterService;
 
     private final NamesrvController namesrvController;
     private final NamesrvConfig namesrvConfig;
@@ -88,7 +88,7 @@ public class RouteInfoManager {
         this.brokerLiveTable = new ConcurrentHashMap<>(256);
         this.filterServerTable = new ConcurrentHashMap<>(256);
         this.topicQueueMappingInfoTable = new ConcurrentHashMap<>(1024);
-        this.unRegisterService = new BatchUnRegisterService(this, 
namesrvConfig);
+        this.unRegisterService = new BatchUnregistrationService(this, 
namesrvConfig);
         this.namesrvConfig = namesrvConfig;
         this.namesrvController = namesrvController;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b5569bb97..b287387a7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -318,7 +318,7 @@ public abstract class NettyRemotingAbstract {
     private void executeInvokeCallback(final ResponseFuture responseFuture) {
         boolean runInThisThread = false;
         ExecutorService executor = this.getCallbackExecutor();
-        if (executor != null) {
+        if (executor != null && !executor.isShutdown()) {
             try {
                 executor.submit(() -> {
                     try {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 06bbae120..bdcfdc95e 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -219,7 +219,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 .channel(useEpoll() ? EpollServerSocketChannel.class : 
NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 1024)
                 .option(ChannelOption.SO_REUSEADDR, true)
-                .option(ChannelOption.SO_KEEPALIVE, false)
+                .childOption(ChannelOption.SO_KEEPALIVE, false)
                 .childOption(ChannelOption.TCP_NODELAY, true)
                 .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getListenPort()))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index 7fe49b1fa..8e6eec371 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,8 +17,10 @@
 
 package org.apache.rocketmq.test.smoke;
 
+import java.time.Duration;
 import java.util.List;
-import org.apache.log4j.Logger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -33,11 +35,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.awaitility.Awaitility;
 
 import static com.google.common.truth.Truth.assertThat;
 
 public class NormalMessageSendAndRecvIT extends BaseConf {
-    private static Logger logger = 
Logger.getLogger(NormalMessageSendAndRecvIT.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(NormalMessageSendAndRecvIT.class);
     private RMQNormalConsumer consumer = null;
     private RMQNormalProducer producer = null;
     private String topic = null;
@@ -57,17 +62,32 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
 
     @After
     public void tearDown() {
-        super.shutdown();
+        BaseConf.shutdown();
     }
 
     @Test
     public void testSynSendMessage() throws Exception {
+        AtomicReference<List<MessageQueue>> messageQueueList = new 
AtomicReference<>();
+        AtomicReference<ConsumeStats> consumeStats = new AtomicReference<>();
+        Awaitility.await().atMost(Duration.ofSeconds(120))
+            .until(() -> {
+                try {
+                    
consumeStats.set(defaultMQAdminExt.examineConsumeStats(group));
+                    
messageQueueList.set(producer.getProducer().fetchPublishMessageQueues(topic));
+                    return !messageQueueList.get().isEmpty() && null != 
consumeStats.get()
+                        && 
consumeStats.get().getOffsetTable().keySet().containsAll(messageQueueList.get());
+                } catch (MQClientException e) {
+                    logger.debug("Exception raised while checking producer and 
consumer are started", e);
+                }
+                return false;
+            });
+
         int msgSize = 10;
-        List<MessageQueue> messageQueueList = 
producer.getProducer().fetchPublishMessageQueues(topic);
-        for (MessageQueue messageQueue: messageQueueList) {
+        for (MessageQueue messageQueue : messageQueueList.get()) {
             producer.send(msgSize, messageQueue);
         }
-        Assert.assertEquals("Not all sent succeeded", msgSize * 
messageQueueList.size(), producer.getAllUndupMsgBody().size());
+        Assert.assertEquals("Not all sent succeeded", msgSize * 
messageQueueList.get().size(),
+            producer.getAllUndupMsgBody().size());
         consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
             consumer.getListener().getAllMsgBody()))
@@ -79,12 +99,12 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
         }
         //shutdown to persist the offset
         consumer.getConsumer().shutdown();
-        ConsumeStats consumeStats = 
defaultMQAdminExt.examineConsumeStats(group);
+        consumeStats.set(defaultMQAdminExt.examineConsumeStats(group));
         //+1 for the retry topic
-        for (MessageQueue messageQueue: messageQueueList) {
-            
Assert.assertTrue(consumeStats.getOffsetTable().containsKey(messageQueue));
-            Assert.assertEquals(msgSize, 
consumeStats.getOffsetTable().get(messageQueue).getConsumerOffset());
-            Assert.assertEquals(msgSize, 
consumeStats.getOffsetTable().get(messageQueue).getBrokerOffset());
+        for (MessageQueue messageQueue : messageQueueList.get()) {
+            
Assert.assertTrue(consumeStats.get().getOffsetTable().containsKey(messageQueue));
+            Assert.assertEquals(msgSize, 
consumeStats.get().getOffsetTable().get(messageQueue).getConsumerOffset());
+            Assert.assertEquals(msgSize, 
consumeStats.get().getOffsetTable().get(messageQueue).getBrokerOffset());
         }
 
     }

Reply via email to