This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4f5f705f16 [ISSUE #8780] Implement asynchronous storage of ack/ck
messages in pop consume to enhance performance (#8727)
4f5f705f16 is described below
commit 4f5f705f16faeb7d491b25679d1c100f38264bb9
Author: rongtong <[email protected]>
AuthorDate: Wed Oct 16 13:39:27 2024 +0800
[ISSUE #8780] Implement asynchronous storage of ack/ck messages in pop
consume to enhance performance (#8727)
* Pop consume asynchronization
* Pass UTs and ITs
* Pass the checkstyle
* Fix LocalGrpcIT can not pass
* Fix the UT can not pass
* Simplify duplicate methods in EscapeBridge
---
.../rocketmq/broker/failover/EscapeBridge.java | 62 ++++++---
.../broker/processor/AckMessageProcessor.java | 39 ++++--
.../processor/ChangeInvisibleTimeProcessor.java | 149 +++++++++++++--------
.../broker/processor/PopBufferMergeService.java | 113 +++++++++++-----
.../ChangeInvisibleTimeProcessorTest.java | 3 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 20 +++
.../proxy/service/message/LocalMessageService.java | 8 +-
.../service/message/LocalMessageServiceTest.java | 5 +-
.../rocketmq/test/base/IntegrationTestBase.java | 2 +
9 files changed, 275 insertions(+), 126 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 762d917d64..dd37f42b2c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -137,7 +137,7 @@ public class EscapeBridge {
brokerNameToSend = mqSelected.getBrokerName();
if
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
{
LOG.warn("putMessageToRemoteBroker failed, remote broker not
found. Topic: {}, MsgId: {}, Broker: {}",
- messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
+ messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
return null;
}
} else {
@@ -147,7 +147,7 @@ public class EscapeBridge {
final String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
if (null == brokerAddrToSend) {
LOG.warn("putMessageToRemoteBroker failed, remote broker address
not found. Topic: {}, MsgId: {}, Broker: {}",
- messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
+ messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
return null;
}
@@ -197,7 +197,7 @@ public class EscapeBridge {
producerGroup, SEND_TIMEOUT);
return future.exceptionally(throwable -> null)
- .thenApplyAsync(sendResult ->
transformSendResult2PutResult(sendResult), this.defaultAsyncSenderExecutor)
+ .thenApplyAsync(this::transformSendResult2PutResult,
this.defaultAsyncSenderExecutor)
.exceptionally(throwable ->
transformSendResult2PutResult(null));
} catch (Exception e) {
@@ -211,7 +211,6 @@ public class EscapeBridge {
}
}
-
private String getProducerGroup(MessageExtBrokerInner messageExt) {
if (null == messageExt) {
return this.innerProducerGroupName;
@@ -223,12 +222,29 @@ public class EscapeBridge {
return producerGroup;
}
-
public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner
messageExt) {
BrokerController masterBroker =
this.brokerController.peekMasterBroker();
if (masterBroker != null) {
return masterBroker.getMessageStore().putMessage(messageExt);
- } else if
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
+ }
+ try {
+ return
asyncRemotePutMessageToSpecificQueue(messageExt).get(SEND_TIMEOUT,
TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.error("Put message to specific queue error", e);
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null,
true);
+ }
+ }
+
+ public CompletableFuture<PutMessageResult>
asyncPutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
+ BrokerController masterBroker =
this.brokerController.peekMasterBroker();
+ if (masterBroker != null) {
+ return masterBroker.getMessageStore().asyncPutMessage(messageExt);
+ }
+ return asyncRemotePutMessageToSpecificQueue(messageExt);
+ }
+
+ public CompletableFuture<PutMessageResult>
asyncRemotePutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
+ if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
&& this.brokerController.getBrokerConfig().isEnableRemoteEscape())
{
try {
messageExt.setWaitStoreMsgOK(false);
@@ -237,7 +253,7 @@ public class EscapeBridge {
List<MessageQueue> mqs =
topicPublishInfo.getMessageQueueList();
if (null == mqs || mqs.isEmpty()) {
- return new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
}
String id = messageExt.getTopic() + messageExt.getStoreHost();
@@ -248,19 +264,17 @@ public class EscapeBridge {
String brokerNameToSend = mq.getBrokerName();
String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
- final SendResult sendResult =
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+ return
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(
brokerAddrToSend, brokerNameToSend,
- messageExt, this.getProducerGroup(messageExt),
SEND_TIMEOUT);
-
- return transformSendResult2PutResult(sendResult);
+ messageExt, this.getProducerGroup(messageExt),
SEND_TIMEOUT).thenCompose(sendResult ->
CompletableFuture.completedFuture(transformSendResult2PutResult(sendResult)));
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
- return new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
}
} else {
LOG.warn("Put message to specific queue failed,
enableSlaveActingMaster={}, enableRemoteEscape={}.",
this.brokerController.getBrokerConfig().isEnableSlaveActingMaster(),
this.brokerController.getBrokerConfig().isEnableRemoteEscape());
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
}
}
@@ -282,12 +296,14 @@ public class EscapeBridge {
}
}
- public Triple<MessageExt, String, Boolean> getMessage(String topic, long
offset, int queueId, String brokerName, boolean deCompressBody) {
+ public Triple<MessageExt, String, Boolean> getMessage(String topic, long
offset, int queueId, String brokerName,
+ boolean deCompressBody) {
return getMessageAsync(topic, offset, queueId, brokerName,
deCompressBody).join();
}
// Triple<MessageExt, info, needRetry>, check info and retry if and only
if MessageExt is null
- public CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageAsync(String topic, long offset, int queueId, String brokerName,
boolean deCompressBody) {
+ public CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageAsync(String topic, long offset,
+ int queueId, String brokerName, boolean deCompressBody) {
MessageStore messageStore =
brokerController.getMessageStoreByBrokerName(brokerName);
if (messageStore != null) {
return messageStore.getMessageAsync(innerConsumerGroupName, topic,
queueId, offset, 1, null)
@@ -300,9 +316,9 @@ public class EscapeBridge {
if (list == null || list.isEmpty()) {
// OFFSET_FOUND_NULL returned by TieredMessageStore
indicates exception occurred
boolean needRetry =
GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
- && messageStore instanceof TieredMessageStore;
+ && messageStore instanceof TieredMessageStore;
LOG.warn("Can not get msg , topic {}, offset {},
queueId {}, needRetry {}, result is {}",
- topic, offset, queueId, needRetry, result);
+ topic, offset, queueId, needRetry, result);
return Triple.of(null, "Can not get msg", needRetry);
}
return Triple.of(list.get(0), "", false);
@@ -340,12 +356,14 @@ public class EscapeBridge {
return foundList;
}
- protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String
topic, long offset, int queueId, String brokerName) {
+ protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String
topic, long offset, int queueId,
+ String brokerName) {
return getMessageFromRemoteAsync(topic, offset, queueId,
brokerName).join();
}
// Triple<MessageExt, info, needRetry>, check info and retry if and only
if MessageExt is null
- protected CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageFromRemoteAsync(String topic, long offset, int queueId, String
brokerName) {
+ protected CompletableFuture<Triple<MessageExt, String, Boolean>>
getMessageFromRemoteAsync(String topic,
+ long offset, int queueId, String brokerName) {
try {
String brokerAddr =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
MixAll.MASTER_ID, false);
if (null == brokerAddr) {
@@ -359,11 +377,11 @@ public class EscapeBridge {
}
return
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
- brokerAddr, this.innerConsumerGroupName, topic, queueId,
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
+ brokerAddr, this.innerConsumerGroupName, topic, queueId,
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
.thenApply(pullResult -> {
if (pullResult.getLeft() != null
- &&
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
- &&
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
+ &&
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
+ &&
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
return
Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
}
return Triple.of(null, pullResult.getMiddle(),
pullResult.getRight());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 6f7b7e8a24..dc1b1b53a3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -98,7 +98,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
- RemotingCommand request) throws
RemotingCommandException {
+ RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
@@ -108,7 +108,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request,
- boolean brokerAllowSuspend) throws
RemotingCommandException {
+ boolean brokerAllowSuspend) throws RemotingCommandException {
AckMessageRequestHeader requestHeader;
BatchAckMessageRequestBody reqBody = null;
final RemotingCommand response =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
@@ -126,7 +126,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()
|| requestHeader.getQueueId() < 0) {
String errorInfo = String.format("queueId[%d] is illegal,
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
- requestHeader.getQueueId(), requestHeader.getTopic(),
topicConfig.getReadQueueNums(), channel.remoteAddress());
+ requestHeader.getQueueId(), requestHeader.getTopic(),
topicConfig.getReadQueueNums(), channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(errorInfo);
@@ -137,7 +137,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (requestHeader.getOffset() < minOffset ||
requestHeader.getOffset() > maxOffset) {
String errorInfo = String.format("offset is illegal,
key:%s@%d, commit:%d, store:%d~%d",
- requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getOffset(), minOffset, maxOffset);
+ requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getOffset(), minOffset, maxOffset);
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.NO_MESSAGE);
response.setRemark(errorInfo);
@@ -165,7 +165,8 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
return response;
}
- private void appendAck(final AckMessageRequestHeader requestHeader, final
BatchAck batchAck, final RemotingCommand response, final Channel channel,
String brokerName) {
+ private void appendAck(final AckMessageRequestHeader requestHeader, final
BatchAck batchAck,
+ final RemotingCommand response, final Channel channel, String
brokerName) {
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
@@ -268,18 +269,36 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+ int finalAckCount = ackCount;
+
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
-> {
+ handlePutMessageResult(putMessageResult, ackMsg, topic,
consumeGroup, popTime, qId, finalAckCount);
+ }).exceptionally(throwable -> {
+ handlePutMessageResult(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, false),
+ ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
+ POP_LOGGER.error("put ack msg error ", throwable);
+ return null;
+ });
+ } else {
+ PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ handlePutMessageResult(putMessageResult, ackMsg, topic,
consumeGroup, popTime, qId, ackCount);
+ }
+ }
+
+ private void handlePutMessageResult(PutMessageResult putMessageResult,
AckMsg ackMsg, String topic,
+ String consumeGroup, long popTime, int qId, int ackCount) {
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
}
- protected void ackOrderly(String topic, String consumeGroup, int qId, long
ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand
response) {
+ protected void ackOrderly(String topic, String consumeGroup, int qId, long
ackOffset, long popTime,
+ long invisibleTime, Channel channel, RemotingCommand response) {
String lockKey = topic + PopAckConstants.SPLIT + consumeGroup +
PopAckConstants.SPLIT + qId;
long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
if (ackOffset < oldOffset) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index bdfffff096..af3b8ae6f0 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.PopAckConstants;
@@ -33,13 +35,13 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
-import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -67,6 +69,35 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+
+ CompletableFuture<RemotingCommand> responseFuture =
processRequestAsync(channel, request, brokerAllowSuspend);
+
+ if (brokerController.getBrokerConfig().isAppendCkAsync() &&
brokerController.getBrokerConfig().isAppendAckAsync()) {
+ responseFuture.thenAccept(response -> doResponse(channel, request,
response)).exceptionally(throwable -> {
+ RemotingCommand response =
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setOpaque(request.getOpaque());
+ doResponse(channel, request, response);
+ POP_LOGGER.error("append checkpoint or ack origin failed",
throwable);
+ return null;
+ });
+ } else {
+ RemotingCommand response;
+ try {
+ response = responseFuture.get(3000, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ response =
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setOpaque(request.getOpaque());
+ POP_LOGGER.error("append checkpoint or ack origin failed", e);
+ }
+ return response;
+ }
+ return null;
+ }
+
+ public CompletableFuture<RemotingCommand> processRequestAsync(final
Channel channel, RemotingCommand request,
+ boolean brokerAllowSuspend) throws RemotingCommandException {
final ChangeInvisibleTimeRequestHeader requestHeader =
(ChangeInvisibleTimeRequestHeader)
request.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class);
RemotingCommand response =
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
@@ -77,7 +108,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
POP_LOGGER.error("The topic {} not exist, consumer: {} ",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first
please! %s", requestHeader.getTopic(),
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
- return response;
+ return CompletableFuture.completedFuture(response);
}
if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() ||
requestHeader.getQueueId() < 0) {
@@ -86,46 +117,35 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(errorInfo);
- return response;
+ return CompletableFuture.completedFuture(response);
}
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (requestHeader.getOffset() < minOffset || requestHeader.getOffset()
> maxOffset) {
response.setCode(ResponseCode.NO_MESSAGE);
- return response;
+ return CompletableFuture.completedFuture(response);
}
String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
if (ExtraInfoUtil.isOrder(extraInfo)) {
- return processChangeInvisibleTimeForOrder(requestHeader,
extraInfo, response, responseHeader);
+ return
CompletableFuture.completedFuture(processChangeInvisibleTimeForOrder(requestHeader,
extraInfo, response, responseHeader));
}
// add new ck
long now = System.currentTimeMillis();
- PutMessageResult ckResult = appendCheckPoint(requestHeader,
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(),
requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));
-
- if (ckResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
- && ckResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
- && ckResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
- && ckResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
- POP_LOGGER.error("change Invisible, put new ck error: {}",
ckResult);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- return response;
- }
-
- // ack old msg.
- try {
- ackOrigin(requestHeader, extraInfo);
- } catch (Throwable e) {
- POP_LOGGER.error("change Invisible, put ack msg error: {}, {}",
requestHeader.getExtraInfo(), e.getMessage());
- // cancel new ck?
- }
- responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
- responseHeader.setPopTime(now);
- responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
- return response;
+ CompletableFuture<Boolean> futureResult =
appendCheckPointThenAckOrigin(requestHeader,
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(),
requestHeader.getOffset(), now, extraInfo);
+ return futureResult.thenCompose(result -> {
+ if (result) {
+
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
+ responseHeader.setPopTime(now);
+
responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ }
+ return CompletableFuture.completedFuture(response);
+ });
}
protected RemotingCommand
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader
requestHeader,
@@ -158,7 +178,8 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
return response;
}
- private void ackOrigin(final ChangeInvisibleTimeRequestHeader
requestHeader, String[] extraInfo) {
+ private CompletableFuture<Boolean> ackOrigin(final
ChangeInvisibleTimeRequestHeader requestHeader,
+ String[] extraInfo) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
AckMsg ackMsg = new AckMsg();
@@ -176,7 +197,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
if
(brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId,
ackMsg)) {
- return;
+ return CompletableFuture.completedFuture(true);
}
msgInner.setTopic(reviveTopic);
@@ -189,18 +210,25 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) +
ExtraInfoUtil.getInvisibleTime(extraInfo));
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
- if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
- POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}",
ackMsg, putMessageResult);
- }
- PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
+ return
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult
-> {
+ if (putMessageResult.getPutMessageStatus() !=
PutMessageStatus.PUT_OK
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}",
ackMsg, putMessageResult);
+ }
+ PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
+ return CompletableFuture.completedFuture(true);
+ }).exceptionally(e -> {
+ POP_LOGGER.error("change Invisible, put ack msg error: {}, {}",
requestHeader.getExtraInfo(), e.getMessage());
+ return false;
+ });
}
- private PutMessageResult appendCheckPoint(final
ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid,
- int queueId, long offset, long popTime, String brokerName) {
+ private CompletableFuture<Boolean> appendCheckPointThenAckOrigin(
+ final ChangeInvisibleTimeRequestHeader requestHeader,
+ int reviveQid,
+ int queueId, long offset, long popTime, String[] extraInfo) {
// add check point msg to revive log
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
@@ -214,7 +242,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
ck.setTopic(requestHeader.getTopic());
ck.setQueueId(queueId);
ck.addDiff(0);
- ck.setBrokerName(brokerName);
+ ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(reviveQid);
@@ -225,21 +253,36 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
msgInner.setDeliverTimeMs(ck.getReviveTime() -
PopAckConstants.ackTimeInterval);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genCkUniqueId(ck));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
-
- if (brokerController.getBrokerConfig().isEnablePopLog()) {
- POP_LOGGER.info("change Invisible , appendCheckPoint, topic {},
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}",
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(),
offset,
- ck.getReviveTime(), putMessageResult);
- }
+ return
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult
-> {
+ if (brokerController.getBrokerConfig().isEnablePopLog()) {
+ POP_LOGGER.info("change Invisible, appendCheckPoint, topic {},
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}",
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(),
offset,
+ ck.getReviveTime(), putMessageResult);
+ }
- if (putMessageResult != null) {
- PopMetricsManager.incPopReviveCkPutCount(ck,
putMessageResult.getPutMessageStatus());
- if (putMessageResult.isOk()) {
-
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
-
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
+ if (putMessageResult != null) {
+ PopMetricsManager.incPopReviveCkPutCount(ck,
putMessageResult.getPutMessageStatus());
+ if (putMessageResult.isOk()) {
+
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
+
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
+ }
}
- }
+ if (putMessageResult.getPutMessageStatus() !=
PutMessageStatus.PUT_OK
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ POP_LOGGER.error("change invisible, put new ck error: {}",
putMessageResult);
+ return CompletableFuture.completedFuture(false);
+ } else {
+ return ackOrigin(requestHeader, extraInfo);
+ }
+ }).exceptionally(throwable -> {
+ POP_LOGGER.error("change invisible, put new ck error", throwable);
+ return null;
+ });
+ }
- return putMessageResult;
+ protected void doResponse(Channel channel, RemotingCommand request,
+ final RemotingCommand response) {
+ NettyRemotingAbstract.writeResponse(channel, request, response);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 8a85dd8fec..e05ab8ebea 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -216,7 +216,8 @@ public class PopBufferMergeService extends ServiceThread {
private void scan() {
long startTime = System.currentTimeMillis();
- int count = 0, countCk = 0;
+ AtomicInteger count = new AtomicInteger(0);
+ int countCk = 0;
Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator =
buffer.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
@@ -257,14 +258,14 @@ public class PopBufferMergeService extends ServiceThread {
} else if (pointWrapper.isJustOffset()) {
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
- putCkToStore(pointWrapper, false);
+ putCkToStore(pointWrapper,
this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
continue;
} else if (removeCk) {
// put buffer ak to store
if (pointWrapper.getReviveQueueOffset() < 0) {
- putCkToStore(pointWrapper, false);
+ putCkToStore(pointWrapper,
this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
@@ -278,17 +279,12 @@ public class PopBufferMergeService extends ServiceThread {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if
(DataConverter.getBit(pointWrapper.getBits().get(), i)
- &&
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+ &&
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
indexList.add(i);
}
}
if (indexList.size() > 0) {
- if (putBatchAckToStore(pointWrapper, indexList)) {
- count += indexList.size();
- for (Byte i : indexList) {
- markBitCAS(pointWrapper.getToStoreBits(),
i);
- }
- }
+ putBatchAckToStore(pointWrapper, indexList, count);
}
} finally {
indexList.clear();
@@ -297,11 +293,8 @@ public class PopBufferMergeService extends ServiceThread {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(),
i)
- &&
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
- if (putAckToStore(pointWrapper, i)) {
- count++;
- markBitCAS(pointWrapper.getToStoreBits(), i);
- }
+ &&
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+ putAckToStore(pointWrapper, i, count);
}
}
}
@@ -312,7 +305,6 @@ public class PopBufferMergeService extends ServiceThread {
}
iterator.remove();
counter.decrementAndGet();
- continue;
}
}
}
@@ -323,13 +315,13 @@ public class PopBufferMergeService extends ServiceThread {
if (eclipse >
brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long,
PopBufferEclipse={}, " +
"PopBufferToStoreAck={}, PopBufferToStoreCk={},
PopBufferSize={}, PopBufferOffsetSize={}",
- eclipse, count, countCk, counter.get(), offsetBufferSize);
+ eclipse, count.get(), countCk, counter.get(),
offsetBufferSize);
this.serving = false;
} else {
if (scanTimes % countOfSecond1 == 0) {
POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +
"PopBufferToStoreAck={}, PopBufferToStoreCk={},
PopBufferSize={}, PopBufferOffsetSize={}",
- eclipse, count, countCk, counter.get(), offsetBufferSize);
+ eclipse, count.get(), countCk, counter.get(),
offsetBufferSize);
}
}
PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
@@ -429,7 +421,8 @@ public class PopBufferMergeService extends ServiceThread {
* @param nextBeginOffset
* @return
*/
- public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId,
long reviveQueueOffset, long nextBeginOffset) {
+ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId,
long reviveQueueOffset,
+ long nextBeginOffset) {
PopCheckPointWrapper pointWrapper = new
PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset,
true);
if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
@@ -439,7 +432,7 @@ public class PopBufferMergeService extends ServiceThread {
return false;
}
- this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
+ this.putCkToStore(pointWrapper, checkQueueOk(pointWrapper));
putOffsetQueue(pointWrapper);
this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
@@ -447,7 +440,7 @@ public class PopBufferMergeService extends ServiceThread {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
}
- return true;
+ return true;
}
public void addCkMock(String group, String topic, int queueId, long
startOffset, long invisibleTime,
@@ -597,13 +590,32 @@ public class PopBufferMergeService extends ServiceThread {
if (pointWrapper.getReviveQueueOffset() >= 0) {
return;
}
+
MessageExtBrokerInner msgInner =
popMessageProcessor.buildCkMsg(pointWrapper.getCk(),
pointWrapper.getReviveQueueId());
- PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+ // Indicates that ck message is storing
+ pointWrapper.setReviveQueueOffset(Long.MAX_VALUE);
+ if (brokerController.getBrokerConfig().isAppendCkAsync() &&
runInCurrent) {
+
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
-> {
+ handleCkMessagePutResult(putMessageResult, pointWrapper);
+ }).exceptionally(throwable -> {
+ POP_LOGGER.error("[PopBuffer]put ck to store fail: {}",
pointWrapper, throwable);
+ pointWrapper.setReviveQueueOffset(-1);
+ return null;
+ });
+ } else {
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ handleCkMessagePutResult(putMessageResult, pointWrapper);
+ }
+ }
+
+ private void handleCkMessagePutResult(PutMessageResult putMessageResult,
final PopCheckPointWrapper pointWrapper) {
PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(),
putMessageResult.getPutMessageStatus());
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ pointWrapper.setReviveQueueOffset(-1);
POP_LOGGER.error("[PopBuffer]put ck to store fail: {}, {}",
pointWrapper, putMessageResult);
return;
}
@@ -621,7 +633,7 @@ public class PopBufferMergeService extends ServiceThread {
}
}
- private boolean putAckToStore(final PopCheckPointWrapper pointWrapper,
byte msgIndex) {
+ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte
msgIndex, AtomicInteger count) {
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final AckMsg ackMsg = new AckMsg();
@@ -643,23 +655,39 @@ public class PopBufferMergeService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
-> {
+ handleAckPutMessageResult(ackMsg, putMessageResult,
pointWrapper, count, msgIndex);
+ }).exceptionally(throwable -> {
+ POP_LOGGER.error("[PopBuffer]put ack to store fail: {}, {}",
pointWrapper, ackMsg, throwable);
+ return null;
+ });
+ } else {
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper,
count, msgIndex);
+ }
+ }
+
+ private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult
putMessageResult,
+ PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex)
{
PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("[PopBuffer]put ack to store fail: {}, {}, {}",
pointWrapper, ackMsg, putMessageResult);
- return false;
+ return;
}
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put ack to store ok: {}, {}, {}",
pointWrapper, ackMsg, putMessageResult);
}
-
- return true;
+ count.incrementAndGet();
+ markBitCAS(pointWrapper.getToStoreBits(), msgIndex);
}
- private boolean putBatchAckToStore(final PopCheckPointWrapper
pointWrapper, final List<Byte> msgIndexList) {
+ private void putBatchAckToStore(final PopCheckPointWrapper pointWrapper,
final List<Byte> msgIndexList,
+ AtomicInteger count) {
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final BatchAckMsg batchAckMsg = new BatchAckMsg();
@@ -683,19 +711,36 @@ public class PopBufferMergeService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
-> {
+ handleBatchAckPutMessageResult(batchAckMsg, putMessageResult,
pointWrapper, count, msgIndexList);
+ }).exceptionally(throwable -> {
+ POP_LOGGER.error("[PopBuffer]put batchAckMsg to store fail:
{}, {}", pointWrapper, batchAckMsg, throwable);
+ return null;
+ });
+ } else {
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ handleBatchAckPutMessageResult(batchAckMsg, putMessageResult,
pointWrapper, count, msgIndexList);
+ }
+ }
+
+ private void handleBatchAckPutMessageResult(BatchAckMsg batchAckMsg,
PutMessageResult putMessageResult,
+ PopCheckPointWrapper pointWrapper, AtomicInteger count, List<Byte>
msgIndexList) {
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {},
{}", pointWrapper, batchAckMsg, putMessageResult);
- return false;
+ return;
}
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {},
{}", pointWrapper, batchAckMsg, putMessageResult);
}
- return true;
+ count.addAndGet(msgIndexList.size());
+ for (Byte i : msgIndexList) {
+ markBitCAS(pointWrapper.getToStoreBits(), i);
+ }
}
private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index ee11f046d0..a7aae7ee3d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
@@ -108,7 +109,7 @@ public class ChangeInvisibleTimeProcessorTest {
@Test
public void testProcessRequest_Success() throws RemotingCommandException,
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
-
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
int queueId = 0;
long queueOffset = 0;
long popTime = System.currentTimeMillis() - 1_000;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2123e9b339..2acfdd69a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -427,6 +427,10 @@ public class BrokerConfig extends BrokerIdentity {
// if false, will still rewrite ck after max times 17
private boolean skipWhenCKRePutReachMaxTimes = false;
+ private boolean appendAckAsync = false;
+
+ private boolean appendCkAsync = false;
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -1859,4 +1863,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setUpdateNameServerAddrPeriod(int updateNameServerAddrPeriod) {
this.updateNameServerAddrPeriod = updateNameServerAddrPeriod;
}
+
+ public boolean isAppendAckAsync() {
+ return appendAckAsync;
+ }
+
+ public void setAppendAckAsync(boolean appendAckAsync) {
+ this.appendAckAsync = appendAckAsync;
+ }
+
+ public boolean isAppendCkAsync() {
+ return appendCkAsync;
+ }
+
+ public void setAppendCkAsync(boolean appendCkAsync) {
+ this.appendCkAsync = appendCkAsync;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 6b2ba02f7c..a8088a95d0 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -176,7 +176,8 @@ public class LocalMessageService implements MessageService {
}
@Override
- public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx,
String brokerName, EndTransactionRequestHeader requestHeader,
+ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx,
String brokerName,
+ EndTransactionRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
@@ -310,9 +311,8 @@ public class LocalMessageService implements MessageService {
RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
- RemotingCommand response =
brokerController.getChangeInvisibleTimeProcessor()
- .processRequest(channelHandlerContext, command);
- future.complete(response);
+ future = brokerController.getChangeInvisibleTimeProcessor()
+ .processRequestAsync(channelHandlerContext.channel(), command,
true);
} catch (Exception e) {
log.error("Fail to process changeInvisibleTime command", e);
future.completeExceptionally(e);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index 3e3d37086b..f7a656d768 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.service.message;
+import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -370,11 +371,11 @@ public class LocalMessageServiceTest extends
InitConfigTest {
responseHeader.setReviveQid(newReviveQueueId);
responseHeader.setInvisibleTime(newInvisibleTime);
responseHeader.setPopTime(newPopTime);
-
Mockito.when(changeInvisibleTimeProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class),
Mockito.argThat(argument -> {
+
Mockito.when(changeInvisibleTimeProcessorMock.processRequestAsync(Mockito.any(Channel.class),
Mockito.argThat(argument -> {
boolean first = argument.getCode() ==
RequestCode.CHANGE_MESSAGE_INVISIBLETIME;
boolean second = argument.readCustomHeader() instanceof
ChangeInvisibleTimeRequestHeader;
return first && second;
- }))).thenReturn(remotingCommand);
+ }),
Mockito.any(Boolean.class))).thenReturn(CompletableFuture.completedFuture(remotingCommand));
ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
CompletableFuture<AckResult> future =
localMessageService.changeInvisibleTime(proxyContext, handle, messageId,
requestHeader, 1000L);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 2217936929..fde991ad13 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -136,6 +136,8 @@ public class IntegrationTestBase {
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
brokerConfig.setEnableCalcFilterBitMap(true);
+ brokerConfig.setAppendAckAsync(true);
+ brokerConfig.setAppendCkAsync(true);
storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);