This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 35ac588fb [ISSUE #4979] Fix flaky integration test
NormalMessageSendAndRecvIT (#4980)
35ac588fb is described below
commit 35ac588fb55ebf6ac0d72e7c7ad8272898d1df0f
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Sep 5 14:27:01 2022 +0800
[ISSUE #4979] Fix flaky integration test NormalMessageSendAndRecvIT (#4980)
* Fix netty warning; Fix flaky integration test NormalMessageSendAndRecvIT;
Fix naming typo
* Change log level
* Use awaitability to make everyone happy
* Do not submit tasks when executor has been shut down
---
.../broker/processor/PopReviveService.java | 16 +++++----
...ervice.java => BatchUnregistrationService.java} | 26 +++++++-------
.../namesrv/routeinfo/RouteInfoManager.java | 4 +--
.../remoting/netty/NettyRemotingAbstract.java | 2 +-
.../remoting/netty/NettyRemotingServer.java | 2 +-
.../test/smoke/NormalMessageSendAndRecvIT.java | 42 ++++++++++++++++------
6 files changed, 58 insertions(+), 34 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 1a6c52ec3..8708b48a6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -198,12 +198,11 @@ public class PopReviveService extends ServiceThread {
break;
case NO_MATCHED_MESSAGE:
pullStatus = PullStatus.NO_MATCHED_MSG;
- POP_LOGGER.warn("no matched message. GetMessageStatus={},
topic={}, groupId={}, requestOffset={}",
+ POP_LOGGER.debug("no matched message. GetMessageStatus={},
topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
break;
case NO_MESSAGE_IN_QUEUE:
- pullStatus = PullStatus.NO_NEW_MSG;
- POP_LOGGER.warn("no new message. GetMessageStatus={},
topic={}, groupId={}, requestOffset={}",
+ POP_LOGGER.debug("no new message. GetMessageStatus={},
topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
break;
case MESSAGE_WAS_REMOVING:
@@ -225,7 +224,11 @@ public class PopReviveService extends ServiceThread {
getMessageResult.getMaxOffset(), foundList);
} else {
- POP_LOGGER.error("get message from store return null. topic={},
groupId={}, requestOffset={}", topic, group, offset);
+ long maxQueueOffset =
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+ if (maxQueueOffset > offset) {
+ POP_LOGGER.error("get message from store return null.
topic={}, groupId={}, requestOffset={}, maxQueueOffset={}",
+ topic, group, offset, maxQueueOffset);
+ }
return null;
}
}
@@ -289,11 +292,12 @@ public class PopReviveService extends ServiceThread {
break;
}
noMsgCount++;
+ // Fixme: why sleep is useful here?
try {
Thread.sleep(100);
- } catch (Throwable e) {
+ } catch (Throwable ignore) {
}
- if (noMsgCount * 100 > 4 * PopAckConstants.SECOND) {
+ if (noMsgCount * 100L > 4 * PopAckConstants.SECOND) {
break;
} else {
continue;
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
similarity index 70%
rename from
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
rename to
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
index 9e84272cb..5a4def305 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnRegisterService.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BatchUnregistrationService.java
@@ -29,17 +29,17 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
- * BatchUnRegisterProcessor provides a mechanism to unregister broker in batch
manner, which can speed up broker offline
+ * BatchUnregistrationService provides a mechanism to unregister brokers in
batch manner, which speeds up broker-offline
* process.
*/
-public class BatchUnRegisterService extends ServiceThread {
+public class BatchUnregistrationService extends ServiceThread {
private final RouteInfoManager routeInfoManager;
- private BlockingQueue<UnRegisterBrokerRequestHeader> unRegisterQueue;
+ private BlockingQueue<UnRegisterBrokerRequestHeader> unregistrationQueue;
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
- public BatchUnRegisterService(RouteInfoManager routeInfoManager,
NamesrvConfig namesrvConfig) {
+ public BatchUnregistrationService(RouteInfoManager routeInfoManager,
NamesrvConfig namesrvConfig) {
this.routeInfoManager = routeInfoManager;
- this.unRegisterQueue = new
LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
+ this.unregistrationQueue = new
LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
}
/**
@@ -49,26 +49,26 @@ public class BatchUnRegisterService extends ServiceThread {
* @return {@code true} if the request was added to this queue, else
{@code false}
*/
public boolean submit(UnRegisterBrokerRequestHeader unRegisterRequest) {
- return unRegisterQueue.offer(unRegisterRequest);
+ return unregistrationQueue.offer(unRegisterRequest);
}
@Override
public String getServiceName() {
- return BatchUnRegisterService.class.getName();
+ return BatchUnregistrationService.class.getName();
}
@Override
public void run() {
while (!this.isStopped()) {
try {
- final UnRegisterBrokerRequestHeader request =
unRegisterQueue.take();
- Set<UnRegisterBrokerRequestHeader> unRegisterRequests = new
HashSet<>();
- unRegisterQueue.drainTo(unRegisterRequests);
+ final UnRegisterBrokerRequestHeader request =
unregistrationQueue.take();
+ Set<UnRegisterBrokerRequestHeader> unregistrationRequests =
new HashSet<>();
+ unregistrationQueue.drainTo(unregistrationRequests);
// Add polled request
- unRegisterRequests.add(request);
+ unregistrationRequests.add(request);
- this.routeInfoManager.unRegisterBroker(unRegisterRequests);
+ this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
@@ -77,6 +77,6 @@ public class BatchUnRegisterService extends ServiceThread {
// For test only
int queueLength() {
- return this.unRegisterQueue.size();
+ return this.unregistrationQueue.size();
}
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 3facb3cf8..ace963ca9 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -76,7 +76,7 @@ public class RouteInfoManager {
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter
Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/,
TopicQueueMappingInfo>> topicQueueMappingInfoTable;
- private final BatchUnRegisterService unRegisterService;
+ private final BatchUnregistrationService unRegisterService;
private final NamesrvController namesrvController;
private final NamesrvConfig namesrvConfig;
@@ -88,7 +88,7 @@ public class RouteInfoManager {
this.brokerLiveTable = new ConcurrentHashMap<>(256);
this.filterServerTable = new ConcurrentHashMap<>(256);
this.topicQueueMappingInfoTable = new ConcurrentHashMap<>(1024);
- this.unRegisterService = new BatchUnRegisterService(this,
namesrvConfig);
+ this.unRegisterService = new BatchUnregistrationService(this,
namesrvConfig);
this.namesrvConfig = namesrvConfig;
this.namesrvController = namesrvController;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b5569bb97..b287387a7 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -318,7 +318,7 @@ public abstract class NettyRemotingAbstract {
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
- if (executor != null) {
+ if (executor != null && !executor.isShutdown()) {
try {
executor.submit(() -> {
try {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 06bbae120..bdcfdc95e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -219,7 +219,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
.channel(useEpoll() ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new
InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
diff --git
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index 7fe49b1fa..8e6eec371 100644
---
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,8 +17,10 @@
package org.apache.rocketmq.test.smoke;
+import java.time.Duration;
import java.util.List;
-import org.apache.log4j.Logger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
@@ -33,11 +35,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.awaitility.Awaitility;
import static com.google.common.truth.Truth.assertThat;
public class NormalMessageSendAndRecvIT extends BaseConf {
- private static Logger logger =
Logger.getLogger(NormalMessageSendAndRecvIT.class);
+ private static final Logger logger =
LoggerFactory.getLogger(NormalMessageSendAndRecvIT.class);
private RMQNormalConsumer consumer = null;
private RMQNormalProducer producer = null;
private String topic = null;
@@ -57,17 +62,32 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
@After
public void tearDown() {
- super.shutdown();
+ BaseConf.shutdown();
}
@Test
public void testSynSendMessage() throws Exception {
+ AtomicReference<List<MessageQueue>> messageQueueList = new
AtomicReference<>();
+ AtomicReference<ConsumeStats> consumeStats = new AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(120))
+ .until(() -> {
+ try {
+
consumeStats.set(defaultMQAdminExt.examineConsumeStats(group));
+
messageQueueList.set(producer.getProducer().fetchPublishMessageQueues(topic));
+ return !messageQueueList.get().isEmpty() && null !=
consumeStats.get()
+ &&
consumeStats.get().getOffsetTable().keySet().containsAll(messageQueueList.get());
+ } catch (MQClientException e) {
+ logger.debug("Exception raised while checking producer and
consumer are started", e);
+ }
+ return false;
+ });
+
int msgSize = 10;
- List<MessageQueue> messageQueueList =
producer.getProducer().fetchPublishMessageQueues(topic);
- for (MessageQueue messageQueue: messageQueueList) {
+ for (MessageQueue messageQueue : messageQueueList.get()) {
producer.send(msgSize, messageQueue);
}
- Assert.assertEquals("Not all sent succeeded", msgSize *
messageQueueList.size(), producer.getAllUndupMsgBody().size());
+ Assert.assertEquals("Not all sent succeeded", msgSize *
messageQueueList.get().size(),
+ producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
@@ -79,12 +99,12 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
}
//shutdown to persist the offset
consumer.getConsumer().shutdown();
- ConsumeStats consumeStats =
defaultMQAdminExt.examineConsumeStats(group);
+ consumeStats.set(defaultMQAdminExt.examineConsumeStats(group));
//+1 for the retry topic
- for (MessageQueue messageQueue: messageQueueList) {
-
Assert.assertTrue(consumeStats.getOffsetTable().containsKey(messageQueue));
- Assert.assertEquals(msgSize,
consumeStats.getOffsetTable().get(messageQueue).getConsumerOffset());
- Assert.assertEquals(msgSize,
consumeStats.getOffsetTable().get(messageQueue).getBrokerOffset());
+ for (MessageQueue messageQueue : messageQueueList.get()) {
+
Assert.assertTrue(consumeStats.get().getOffsetTable().containsKey(messageQueue));
+ Assert.assertEquals(msgSize,
consumeStats.get().getOffsetTable().get(messageQueue).getConsumerOffset());
+ Assert.assertEquals(msgSize,
consumeStats.get().getOffsetTable().get(messageQueue).getBrokerOffset());
}
}