This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aab646ce83 [ISSUE #8460] Improve the pop revive process when reading
biz messages from a remote broker (#8475)
aab646ce83 is described below
commit aab646ce83930454d8b5956779aa28a80e326e24
Author: imzs <[email protected]>
AuthorDate: Fri Aug 2 14:43:21 2024 +0800
[ISSUE #8460] Improve the pop revive process when reading biz messages from
a remote broker (#8475)
* [ISSUE #8460] part1: add extra information to the call chain of remote
message reading
* [ISSUE #8460] part2: add exponential backoff and ending condition of CK
rewrite, and fix checkstyle
* [ISSUE #8460] exclude test, BrokerOuterAPITest passed locally, but failed
on bazel.
---
broker/BUILD.bazel | 1 +
.../rocketmq/broker/failover/EscapeBridge.java | 44 +++---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 13 +-
.../broker/processor/PopReviveService.java | 41 +++--
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 173 ++++++++++++++++++++-
.../rocketmq/broker/failover/EscapeBridgeTest.java | 150 +++++++++++++++++-
.../broker/processor/PopReviveServiceTest.java | 166 +++++++++++++++++++-
.../apache/rocketmq/store/pop/PopCheckPoint.java | 23 ++-
8 files changed, 554 insertions(+), 57 deletions(-)
diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 0dbc85f945..66e621e930 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -100,6 +100,7 @@ GenTestRules(
exclude_tests = [
# These tests are extremely slow and flaky, exclude them before
they are properly fixed.
"src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest",
+ "src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest",
],
deps = [
":tests",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index ededaf2c65..7df49f8c47 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -25,7 +25,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -34,7 +36,6 @@ import
org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
@@ -47,7 +48,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
@@ -263,34 +263,29 @@ public class EscapeBridge {
}
}
- public Pair<GetMessageStatus, MessageExt> getMessage(String topic, long
offset, int queueId, String brokerName, boolean deCompressBody) {
+ public Triple<MessageExt, String, Boolean> getMessage(String topic, long
offset, int queueId, String brokerName, boolean deCompressBody) {
return getMessageAsync(topic, offset, queueId, brokerName,
deCompressBody).join();
}
- public CompletableFuture<Pair<GetMessageStatus, MessageExt>>
getMessageAsync(String topic, long offset, int queueId, String brokerName,
boolean deCompressBody) {
+ // Triple<MessageExt, info, needRetry>, check info and retry if and only
if MessageExt is null
+ public CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageAsync(String topic, long offset, int queueId, String brokerName,
boolean deCompressBody) {
MessageStore messageStore =
brokerController.getMessageStoreByBrokerName(brokerName);
if (messageStore != null) {
return messageStore.getMessageAsync(innerConsumerGroupName, topic,
queueId, offset, 1, null)
.thenApply(result -> {
if (result == null) {
LOG.warn("getMessageResult is null ,
innerConsumerGroupName {}, topic {}, offset {}, queueId {}",
innerConsumerGroupName, topic, offset, queueId);
- return new
Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
+ return Triple.of(null, "getMessageResult is null",
false); // local store, so no retry
}
List<MessageExt> list = decodeMsgList(result,
deCompressBody);
if (list == null || list.isEmpty()) {
LOG.warn("Can not get msg , topic {}, offset {},
queueId {}, result is {}", topic, offset, queueId, result);
- return new Pair<>(result.getStatus(), null);
+ return Triple.of(null, "Can not get msg", false); //
local store, so no retry
}
- return new Pair<>(result.getStatus(), list.get(0));
+ return Triple.of(list.get(0), "", false);
});
} else {
- return getMessageFromRemoteAsync(topic, offset, queueId,
brokerName)
- .thenApply(msg -> {
- if (msg == null) {
- return new
Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
- }
- return new Pair<>(GetMessageStatus.FOUND, msg);
- });
+ return getMessageFromRemoteAsync(topic, offset, queueId,
brokerName);
}
}
@@ -322,11 +317,12 @@ public class EscapeBridge {
return foundList;
}
- protected MessageExt getMessageFromRemote(String topic, long offset, int
queueId, String brokerName) {
+ protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String
topic, long offset, int queueId, String brokerName) {
return getMessageFromRemoteAsync(topic, offset, queueId,
brokerName).join();
}
- protected CompletableFuture<MessageExt> getMessageFromRemoteAsync(String
topic, long offset, int queueId, String brokerName) {
+ // Triple<MessageExt, info, needRetry>, check info and retry if and only
if MessageExt is null
+ protected CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageFromRemoteAsync(String topic, long offset, int queueId, String
brokerName) {
try {
String brokerAddr =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
MixAll.MASTER_ID, false);
if (null == brokerAddr) {
@@ -334,23 +330,25 @@ public class EscapeBridge {
brokerAddr =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
MixAll.MASTER_ID, false);
if (null == brokerAddr) {
- LOG.warn("can't find broker address for topic {}", topic);
- return CompletableFuture.completedFuture(null);
+ LOG.warn("can't find broker address for topic {}, {}",
topic, brokerName);
+ return CompletableFuture.completedFuture(Triple.of(null,
"brokerAddress not found", true)); // maybe offline temporarily, so need retry
}
}
return
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
brokerAddr, this.innerConsumerGroupName, topic, queueId,
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
.thenApply(pullResult -> {
- if (pullResult.getPullStatus().equals(PullStatus.FOUND) &&
!pullResult.getMsgFoundList().isEmpty()) {
- return pullResult.getMsgFoundList().get(0);
+ if (pullResult.getLeft() != null
+ &&
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
+ &&
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
+ return
Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
}
- return null;
+ return Triple.of(null, pullResult.getMiddle(),
pullResult.getRight());
});
} catch (Exception e) {
- LOG.error("Get message from remote failed.", e);
+ LOG.error("Get message from remote failed. {}, {}, {}, {}", topic,
offset, queueId, brokerName, e);
}
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(Triple.of(null, "Get message
from remote failed", true)); // need retry
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d5c80ce2ec..83edd88408 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.auth.config.AuthConfig;
@@ -1378,7 +1379,8 @@ public class BrokerOuterAPI {
});
}
- public CompletableFuture<PullResult>
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+ // Triple<PullResult, info, needRetry>, should check info and retry if and
only if PullResult is null
+ public CompletableFuture<Triple<PullResult, String, Boolean>>
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
String consumerGroup, String topic, int queueId, long offset,
int maxNums, long timeoutMillis) throws RemotingException,
InterruptedException {
PullMessageRequestHeader requestHeader = new
PullMessageRequestHeader();
@@ -1397,7 +1399,7 @@ public class BrokerOuterAPI {
requestHeader.setBrokerName(brokerName);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
- CompletableFuture<PullResult> pullResultFuture = new
CompletableFuture<>();
+ CompletableFuture<Triple<PullResult, String, Boolean>>
pullResultFuture = new CompletableFuture<>();
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis,
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
@@ -1409,15 +1411,16 @@ public class BrokerOuterAPI {
try {
PullResultExt pullResultExt =
processPullResponse(response, brokerAddr);
processPullResult(pullResultExt, brokerName, queueId);
- pullResultFuture.complete(pullResultExt);
+ pullResultFuture.complete(Triple.of(pullResultExt,
pullResultExt.getPullStatus().name(), false)); // found or not found really, so
no retry
} catch (Exception e) {
- pullResultFuture.complete(new
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+ // retry when NO_PERMISSION, SUBSCRIPTION_GROUP_NOT_EXIST
etc. even when TOPIC_NOT_EXIST
+ pullResultFuture.complete(Triple.of(null, "Response Code:"
+ response.getCode(), true));
}
}
@Override
public void operationFail(Throwable throwable) {
- pullResultFuture.complete(new
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+ pullResultFuture.complete(Triple.of(null,
throwable.getMessage(), true));
}
});
return pullResultFuture;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 8074af23bf..114d094600 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
@@ -34,6 +35,7 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
@@ -51,7 +53,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
@@ -63,6 +64,7 @@ import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOP
public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+ private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30,
60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };
private int queueId;
private BrokerController brokerController;
@@ -196,7 +198,8 @@ public class PopReviveService extends ServiceThread {
|| pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL &&
offset == pullResult.getMaxOffset();
}
- private CompletableFuture<Pair<GetMessageStatus, MessageExt>>
getBizMessage(String topic, long offset, int queueId,
+ // Triple<MessageExt, info, needRetry>
+ private CompletableFuture<Triple<MessageExt, String, Boolean>>
getBizMessage(String topic, long offset, int queueId,
String brokerName) {
return this.brokerController.getEscapeBridge().getMessageAsync(topic,
offset, queueId, brokerName, false);
}
@@ -491,6 +494,8 @@ public class PopReviveService extends ServiceThread {
PopCheckPoint oldCK = inflightReviveRequestMap.firstKey();
rePutCK(oldCK, pair);
inflightReviveRequestMap.remove(oldCK);
+ POP_LOGGER.warn("stay too long, remove from
reviveRequestMap, {}, {}, {}, {}", popCheckPoint.getTopic(),
+ popCheckPoint.getBrokerName(),
popCheckPoint.getQueueId(), popCheckPoint.getStartOffset());
}
}
@@ -524,22 +529,12 @@ public class PopReviveService extends ServiceThread {
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
CompletableFuture<Pair<Long, Boolean>> future =
getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(),
popCheckPoint.getBrokerName())
- .thenApply(resultPair -> {
- GetMessageStatus getMessageStatus =
resultPair.getObject1();
- MessageExt message = resultPair.getObject2();
+ .thenApply(rst -> {
+ MessageExt message = rst.getLeft();
if (message == null) {
- POP_LOGGER.debug("reviveQueueId={}, can not get biz
msg topic is {}, offset is {}, then continue",
- queueId, popCheckPoint.getTopic(), msgOffset);
- switch (getMessageStatus) {
- case MESSAGE_WAS_REMOVING:
- case OFFSET_TOO_SMALL:
- case NO_MATCHED_LOGIC_QUEUE:
- case NO_MESSAGE_IN_QUEUE:
- return new Pair<>(msgOffset, true);
- default:
- return new Pair<>(msgOffset, false);
-
- }
+ POP_LOGGER.info("reviveQueueId={}, can not get biz
msg, topic:{}, qid:{}, offset:{}, brokerName:{}, info:{}, retry:{}, then
continue",
+ queueId, popCheckPoint.getTopic(),
popCheckPoint.getQueueId(), msgOffset, popCheckPoint.getBrokerName(),
UtilAll.frontStringAtLeast(rst.getMiddle(), 60), rst.getRight());
+ return new Pair<>(msgOffset, !rst.getRight()); //
Pair.object2 means OK or not, Triple.right value means needRetry
}
boolean result = reviveRetry(popCheckPoint, message);
return new Pair<>(msgOffset, result);
@@ -572,6 +567,13 @@ public class PopReviveService extends ServiceThread {
}
private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
+ int rePutTimes = oldCK.parseRePutTimes();
+ if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
+ POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {},
{}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
+ oldCK.getBrokerName(), oldCK.getQueueId(),
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
+ return;
+ }
+
PopCheckPoint newCk = new PopCheckPoint();
newCk.setBitMap(0);
newCk.setNum((byte) 1);
@@ -583,6 +585,11 @@ public class PopReviveService extends ServiceThread {
newCk.setQueueId(oldCK.getQueueId());
newCk.setBrokerName(oldCK.getBrokerName());
newCk.addDiff(0);
+ newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always
increment even if removed from reviveRequestMap
+ if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
+ // never expect an ACK matched in the future, we just use it to
rewrite CK and try to revive retry message next time
+ newCk.setInvisibleTime(oldCK.getInvisibleTime() +
ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
+ }
MessageExtBrokerInner ckMsg =
brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 8f89c14ae9..440ebf813b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -20,29 +20,43 @@ package org.apache.rocketmq.broker;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
@@ -56,9 +70,12 @@ import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -69,9 +86,11 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.AdditionalMatchers.or;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(NettyRemotingClient.class)
public class BrokerOuterAPITest {
@Mock
private ChannelHandlerContext handlerContext;
@@ -251,4 +270,154 @@ public class BrokerOuterAPITest {
});
Assert.assertTrue(result.get());
}
+
+ @Test
+ public void testPullMessageFromSpecificBrokerAsync_createChannel_null()
throws Exception {
+ NettyRemotingClient mockClient = PowerMockito.spy(new
NettyRemotingClient(new NettyClientConfig()));
+ PowerMockito.when(mockClient, "getAndCreateChannelAsync",
any()).thenReturn(null);
+ BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new
AuthConfig());
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(api, mockClient);
+
+ Triple<PullResult, String, Boolean> rst =
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertTrue(rst.getMiddle().contains("connect"));
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ @Test
+ public void
testPullMessageFromSpecificBrokerAsync_createChannel_future_notSuccess() throws
Exception {
+ NettyRemotingClient mockClient = PowerMockito.spy(new
NettyRemotingClient(new NettyClientConfig()));
+ DefaultChannelPromise promise = PowerMockito.spy(new
DefaultChannelPromise(PowerMockito.mock(Channel.class), new
DefaultEventExecutor()));
+ PowerMockito.when(mockClient, "getAndCreateChannelAsync",
any()).thenReturn(promise);
+ BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new
AuthConfig());
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(api, mockClient);
+
+ promise.tryFailure(new Throwable());
+ Triple<PullResult, String, Boolean> rst
+ = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1,
1, 3000L).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertTrue(rst.getMiddle().contains("connect"));
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ // skip other future status test
+
+ @Test
+ public void testPullMessageFromSpecificBrokerAsync_timeout() throws
Exception {
+ Channel channel = Mockito.mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+ NettyRemotingClient mockClient = PowerMockito.spy(new
NettyRemotingClient(new NettyClientConfig()));
+ DefaultChannelPromise promise = PowerMockito.spy(new
DefaultChannelPromise(PowerMockito.mock(Channel.class), new
DefaultEventExecutor()));
+ PowerMockito.when(mockClient, "getAndCreateChannelAsync",
any()).thenReturn(promise);
+ when(promise.channel()).thenReturn(channel);
+ BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new
AuthConfig());
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(api, mockClient);
+
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ doReturn(future).when(mockClient).invokeImpl(any(Channel.class),
any(RemotingCommand.class), anyLong());
+ promise.trySuccess(null);
+ future.completeExceptionally(new RemotingTimeoutException("wait
response on the channel timeout"));
+ Triple<PullResult, String, Boolean> rst =
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertTrue(rst.getMiddle().contains("timeout"));
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ @Test
+ public void
testPullMessageFromSpecificBrokerAsync_brokerReturn_pullStatusCode() throws
Exception {
+ Channel channel = Mockito.mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+ NettyRemotingClient mockClient = PowerMockito.spy(new
NettyRemotingClient(new NettyClientConfig()));
+ DefaultChannelPromise promise = PowerMockito.spy(new
DefaultChannelPromise(PowerMockito.mock(Channel.class), new
DefaultEventExecutor()));
+ PowerMockito.when(mockClient, "getAndCreateChannelAsync",
any()).thenReturn(promise);
+ when(promise.channel()).thenReturn(channel);
+ BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new
AuthConfig());
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(api, mockClient);
+
+ int[] respCodes = new int[] {ResponseCode.SUCCESS,
ResponseCode.PULL_NOT_FOUND, ResponseCode.PULL_RETRY_IMMEDIATELY,
ResponseCode.PULL_OFFSET_MOVED};
+ PullStatus[] respStatus = new PullStatus[] {PullStatus.FOUND,
PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG, PullStatus.OFFSET_ILLEGAL};
+ for (int i = 0; i < respCodes.length; i++) {
+ CompletableFuture<ResponseFuture> future = new
CompletableFuture<>();
+ doReturn(future).when(mockClient).invokeImpl(any(Channel.class),
any(RemotingCommand.class), anyLong());
+ RemotingCommand response = mockPullMessageResponse(respCodes[i]);
+ ResponseFuture responseFuture = new ResponseFuture(channel, 0,
null, 1000,
+ resp -> { }, new SemaphoreReleaseOnlyOnce(new
Semaphore(1)));
+ responseFuture.setResponseCommand(response);
+ promise.trySuccess(null);
+ future.complete(responseFuture);
+
+ Triple<PullResult, String, Boolean> rst =
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+ Assert.assertEquals(respStatus[i], rst.getLeft().getPullStatus());
+ if (ResponseCode.SUCCESS == respCodes[i]) {
+ Assert.assertEquals(1, rst.getLeft().getMsgFoundList().size());
+ } else {
+ Assert.assertNull(rst.getLeft().getMsgFoundList());
+ }
+ Assert.assertEquals(respStatus[i].name(), rst.getMiddle());
+ Assert.assertFalse(rst.getRight()); // no retry
+ }
+ }
+
+ @Test
+ public void
testPullMessageFromSpecificBrokerAsync_brokerReturn_allOtherResponseCode()
throws Exception {
+ Channel channel = Mockito.mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+ NettyRemotingClient mockClient = PowerMockito.spy(new
NettyRemotingClient(new NettyClientConfig()));
+ DefaultChannelPromise promise = PowerMockito.spy(new
DefaultChannelPromise(PowerMockito.mock(Channel.class), new
DefaultEventExecutor()));
+ PowerMockito.when(mockClient, "getAndCreateChannelAsync",
any()).thenReturn(promise);
+ when(promise.channel()).thenReturn(channel);
+ BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new
AuthConfig());
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(api, mockClient);
+
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ doReturn(future).when(mockClient).invokeImpl(any(Channel.class),
any(RemotingCommand.class), anyLong());
+ // test one code here, skip others
+ RemotingCommand response =
mockPullMessageResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ ResponseFuture responseFuture = new ResponseFuture(channel, 0, null,
1000,
+ resp -> { }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+ responseFuture.setResponseCommand(response);
+ promise.trySuccess(null);
+ future.complete(responseFuture);
+
+ Triple<PullResult, String, Boolean> rst =
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+ Assert.assertNull(rst.getLeft());
+
Assert.assertTrue(rst.getMiddle().contains(ResponseCode.SUBSCRIPTION_NOT_EXIST
+ ""));
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ private RemotingCommand mockPullMessageResponse(int responseCode) throws
Exception {
+ RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ response.setCode(responseCode);
+ if (responseCode == ResponseCode.SUCCESS) {
+ MessageExt msg = new MessageExt();
+ msg.setBody("HW".getBytes());
+ msg.setTopic("topic");
+ msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+ msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+ byte[] encode = MessageDecoder.encode(msg, false);
+ response.setBody(encode);
+ }
+ PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)
response.readCustomHeader();
+ responseHeader.setNextBeginOffset(0L);
+ responseHeader.setMaxOffset(0L);
+ responseHeader.setMinOffset(0L);
+ responseHeader.setOffsetDelta(0L);
+ responseHeader.setTopicSysFlag(0);
+ responseHeader.setGroupSysFlag(0);
+ responseHeader.setSuggestWhichBrokerId(0L);
+ responseHeader.setForbiddenType(0);
+ response.makeCustomHeaderToNet();
+ return response;
+ }
+
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index d7bd753d77..7ea06665c3 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -17,10 +17,14 @@
package org.apache.rocketmq.broker.failover;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
@@ -28,7 +32,10 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.PutMessageResult;
@@ -38,6 +45,7 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.assertj.core.api.Assertions;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,7 +57,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -73,6 +80,12 @@ public class EscapeBridgeTest {
@Mock
private DefaultMQProducer defaultMQProducer;
+ @Mock
+ private TopicRouteInfoManager topicRouteInfoManager;
+
+ @Mock
+ private BrokerOuterAPI brokerOuterAPI;
+
private static final String BROKER_NAME = "broker_a";
private static final String TEST_TOPIC = "TEST_TOPIC";
@@ -92,14 +105,10 @@ public class EscapeBridgeTest {
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(),
any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
- TopicRouteInfoManager topicRouteInfoManager =
mock(TopicRouteInfoManager.class);
when(brokerController.getTopicRouteInfoManager()).thenReturn(topicRouteInfoManager);
when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(),
anyLong(), anyBoolean())).thenReturn("");
- BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
- when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(),
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(),
anyLong()))
- .thenReturn(CompletableFuture.completedFuture(new
PullResult(PullStatus.FOUND, -1, -1, -1, new ArrayList<>())));
brokerConfig.setEnableSlaveActingMaster(true);
brokerConfig.setEnableRemoteEscape(true);
@@ -179,6 +188,52 @@ public class EscapeBridgeTest {
Assertions.assertThatCode(() ->
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false)).doesNotThrowAnyException();
}
+ @Test
+ public void getMessageAsyncTest_localStore_getMessageAsync_null() {
+
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("getMessageResult is null", rst.getMiddle());
+ Assert.assertFalse(rst.getRight()); // no retry
+ }
+
+ @Test
+ public void getMessageAsyncTest_localStore_decodeNothing() throws
Exception {
+
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0,
TEST_TOPIC, null)));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("Can not get msg", rst.getMiddle());
+ Assert.assertFalse(rst.getRight()); // no retry
+ }
+
+ @Test
+ public void getMessageAsyncTest_localStore_message_found() throws
Exception {
+
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(2,
TEST_TOPIC, "HW".getBytes())));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNotNull(rst.getLeft());
+ Assert.assertEquals(0, rst.getLeft().getQueueOffset());
+ Assert.assertTrue(Arrays.equals("HW".getBytes(),
rst.getLeft().getBody()));
+ Assert.assertFalse(rst.getRight());
+ }
+
+ @Test
+ public void getMessageAsyncTest_remoteStore_addressNotFound() throws
Exception {
+
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(null);
+
+ // just test address not found, since we have complete tests of
getMessageFromRemoteAsync()
+ when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(),
anyLong(), anyBoolean())).thenReturn(null);
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("brokerAddress not found", rst.getMiddle());
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
@Test
public void getMessageFromRemoteTest() {
Assertions.assertThatCode(() ->
escapeBridge.getMessageFromRemote(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME)).doesNotThrowAnyException();
@@ -189,6 +244,54 @@ public class EscapeBridgeTest {
Assertions.assertThatCode(() ->
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME)).doesNotThrowAnyException();
}
+ @Test
+ public void getMessageFromRemoteAsyncTest_exception_caught() throws
Exception {
+ when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(),
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(),
anyLong()))
+ .thenThrow(new RemotingException("mock remoting exception"));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("Get message from remote failed", rst.getMiddle());
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ @Test
+ public void getMessageFromRemoteAsyncTest_brokerAddressNotFound() throws
Exception {
+ when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(),
anyLong(), anyBoolean())).thenReturn(null);
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("brokerAddress not found", rst.getMiddle());
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
+ @Test
+ public void getMessageFromRemoteAsyncTest_message_found() throws Exception
{
+ PullResult pullResult = new PullResult(PullStatus.FOUND, 1, 1, 1,
Arrays.asList(new MessageExt()));
+ when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(),
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(),
anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "",
false))); // right value is ignored
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME).join();
+ Assert.assertNotNull(rst.getLeft());
+ Assert.assertTrue(StringUtils.isEmpty(rst.getMiddle()));
+ Assert.assertFalse(rst.getRight()); // no retry
+ }
+
+ @Test
+ public void getMessageFromRemoteAsyncTest_message_notFound() throws
Exception {
+ PullResult pullResult = new PullResult(PullStatus.NO_MATCHED_MSG, 1,
1, 1, null);
+ when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(),
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(),
anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "no msg",
false)));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID,
BROKER_NAME).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("no msg", rst.getMiddle());
+ Assert.assertFalse(rst.getRight()); // no retry
+
+ when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(),
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(),
anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(null,
"other resp code", true)));
+ rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1,
DEFAULT_QUEUE_ID, BROKER_NAME).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("other resp code", rst.getMiddle());
+ Assert.assertTrue(rst.getRight()); // need retry
+ }
+
@Test
public void decodeMsgListTest() {
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
@@ -199,4 +302,39 @@ public class EscapeBridgeTest {
Assertions.assertThatCode(() ->
escapeBridge.decodeMsgList(getMessageResult, false)).doesNotThrowAnyException();
}
+ @Test
+ public void decodeMsgListTest_messageNotNull() throws Exception {
+ MessageExt msg = new MessageExt();
+ msg.setBody("HW".getBytes());
+ msg.setTopic("topic");
+ msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+ msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+ ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg,
false));
+ SelectMappedBufferResult result = new SelectMappedBufferResult(0,
byteBuffer, 10, new DefaultMappedFile());
+
+
+ getMessageResult.addMessage(result);
+ getMessageResult.getMessageQueueOffset().add(0L);
+ List<MessageExt> list = escapeBridge.decodeMsgList(getMessageResult,
false); // skip deCompressBody test
+ Assert.assertEquals(1, list.size());
+ Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
+ }
+
+ private GetMessageResult mockGetMessageResult(int count, String topic,
byte[] body) throws Exception {
+ GetMessageResult result = new GetMessageResult();
+ for (int i = 0; i < count; i++) {
+ MessageExt msg = new MessageExt();
+ msg.setBody(body);
+ msg.setTopic(topic);
+ msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+ msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+ ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg,
false));
+ SelectMappedBufferResult bufferResult = new
SelectMappedBufferResult(0, byteBuffer, body.length, new DefaultMappedFile());
+
+ result.addMessage(bufferResult);
+ result.getMessageQueueOffset().add(i + 0L);
+ }
+ return result;
+ }
+
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 78b76264fe..d7ea97c550 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -20,12 +20,17 @@ import com.alibaba.fastjson.JSON;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
@@ -36,9 +41,14 @@ import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.common.utils.NetworkUtil;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,19 +60,25 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
@RunWith(MockitoJUnitRunner.Silent.class)
public class PopReviveServiceTest {
- private static final String REVIVE_TOPIC = PopAckConstants.REVIVE_TOPIC +
"test";
+ private static final String CLUSTER_NAME = "test";
+ private static final String REVIVE_TOPIC =
PopAckConstants.buildClusterReviveTopic(CLUSTER_NAME);
private static final int REVIVE_QUEUE_ID = 0;
private static final String GROUP = "group";
private static final String TOPIC = "topic";
private static final SocketAddress STORE_HOST =
NetworkUtil.string2SocketAddress("127.0.0.1:8080");
+ private static final Long INVISIBLE_TIME = 1000L;
@Mock
private MessageStore messageStore;
@@ -76,6 +92,9 @@ public class PopReviveServiceTest {
private SubscriptionGroupManager subscriptionGroupManager;
@Mock
private BrokerController brokerController;
+ @Mock
+ private EscapeBridge escapeBridge;
+ private PopMessageProcessor popMessageProcessor;
private BrokerConfig brokerConfig;
private PopReviveService popReviveService;
@@ -83,12 +102,14 @@ public class PopReviveServiceTest {
@Before
public void before() {
brokerConfig = new BrokerConfig();
-
+ brokerConfig.setBrokerClusterName(CLUSTER_NAME);
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+ when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);
@@ -96,6 +117,9 @@ public class PopReviveServiceTest {
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new
TopicConfig());
when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(new
SubscriptionGroupConfig());
+ popMessageProcessor = new PopMessageProcessor(brokerController); // a
real one, not mock
+
when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
+
popReviveService = spy(new PopReviveService(brokerController,
REVIVE_TOPIC, REVIVE_QUEUE_ID));
popReviveService.setShouldRunPopRevive(true);
}
@@ -204,6 +228,141 @@ public class PopReviveServiceTest {
assertEquals(maxReviveOffset,
commitOffsetCaptor.getValue().longValue());
}
+ @Test
+ public void testReviveMsgFromCk_messageFound_writeRetryOK() throws
Throwable {
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+ StringBuilder actualRetryTopic = new StringBuilder();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(new
MessageExt(), "", false)));
+
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualRetryTopic.append(msg.getTopic());
+ return new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK));
+ });
+
+ popReviveService.mergeAndRevive(reviveObj);
+ Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP,
false), actualRetryTopic.toString());
+ verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
+ @Test
+ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK()
throws Throwable {
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+ StringBuilder actualRetryTopic = new StringBuilder();
+ StringBuilder actualReviveTopic = new StringBuilder();
+ AtomicLong actualInvisibleTime = new AtomicLong(0L);
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(new
MessageExt(), "", false)));
+
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualRetryTopic.append(msg.getTopic());
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+ });
+
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualReviveTopic.append(msg.getTopic());
+ PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(),
PopCheckPoint.class);
+ actualInvisibleTime.set(rewriteCK.getReviveTime());
+ return new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK));
+ });
+
+ popReviveService.mergeAndRevive(reviveObj);
+ Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP,
false), actualRetryTopic.toString());
+ Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString());
+ Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L,
actualInvisibleTime.get()); // first interval is 10s
+ verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
+ @Test
+ public void
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws
Throwable {
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ ck.setRePutTimes("17");
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+ StringBuilder actualRetryTopic = new StringBuilder();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(new
MessageExt(), "", false)));
+
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualRetryTopic.append(msg.getTopic());
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+ });
+
+ popReviveService.mergeAndRevive(reviveObj);
+ Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP,
false), actualRetryTopic.toString());
+ verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
+ @Test
+ public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable
{
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(null,
"", false)));
+
+ popReviveService.mergeAndRevive(reviveObj);
+ verify(escapeBridge,
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
+ @Test
+ public void testReviveMsgFromCk_messageNotFound_needRetry() throws
Throwable {
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+ StringBuilder actualReviveTopic = new StringBuilder();
+ AtomicLong actualInvisibleTime = new AtomicLong(0L);
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(null,
"", true)));
+
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualReviveTopic.append(msg.getTopic());
+ PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(),
PopCheckPoint.class);
+ actualInvisibleTime.set(rewriteCK.getReviveTime());
+ return new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK));
+ });
+
+ popReviveService.mergeAndRevive(reviveObj);
+ Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString());
+ Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L,
actualInvisibleTime.get()); // first interval is 10s
+ verify(escapeBridge,
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
+ @Test
+ public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws
Throwable {
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ ck.setRePutTimes("17");
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(null,
"", true)));
+
+ popReviveService.mergeAndRevive(reviveObj);
+ verify(escapeBridge,
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
public static PopCheckPoint buildPopCheckPoint(long startOffset, long
popTime, long reviveOffset) {
PopCheckPoint ck = new PopCheckPoint();
ck.setStartOffset(startOffset);
@@ -214,7 +373,8 @@ public class PopReviveServiceTest {
ck.setNum((byte) 1);
ck.setBitMap(0);
ck.setReviveOffset(reviveOffset);
- ck.setInvisibleTime(1000);
+ ck.setInvisibleTime(INVISIBLE_TIME);
+ ck.setBrokerName("broker-a");
return ck;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index e041b66d9c..38e0a20752 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -43,6 +43,8 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
private List<Integer> queueOffsetDiff;
@JSONField(name = "bn")
String brokerName;
+ @JSONField(name = "rp")
+ String rePutTimes; // ck rePut times
public long getReviveOffset() {
return reviveOffset;
@@ -136,6 +138,14 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
this.brokerName = brokerName;
}
+ public String getRePutTimes() {
+ return rePutTimes;
+ }
+
+ public void setRePutTimes(String rePutTimes) {
+ this.rePutTimes = rePutTimes;
+ }
+
public void addDiff(int diff) {
if (this.queueOffsetDiff == null) {
this.queueOffsetDiff = new ArrayList<>(8);
@@ -171,10 +181,21 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
return startOffset + queueOffsetDiff.get(index);
}
+ public int parseRePutTimes() {
+ if (null == rePutTimes) {
+ return 0;
+ }
+ try {
+ return Integer.parseInt(rePutTimes);
+ } catch (Exception e) {
+ }
+ return Byte.MAX_VALUE;
+ }
+
@Override
public String toString() {
return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId="
+ queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" +
num + ", reviveTime=" + getReviveTime()
- + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff +
", brokerName=" + brokerName + "]";
+ + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff +
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + "]";
}
@Override