This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4f5f705f16 [ISSUE #8780] Implement asynchronous storage of ack/ck 
messages in pop consume to enhance performance (#8727)
4f5f705f16 is described below

commit 4f5f705f16faeb7d491b25679d1c100f38264bb9
Author: rongtong <[email protected]>
AuthorDate: Wed Oct 16 13:39:27 2024 +0800

    [ISSUE #8780] Implement asynchronous storage of ack/ck messages in pop 
consume to enhance performance (#8727)
    
    * Pop consume asynchronization
    
    * Pass UTs and ITs
    
    * Pass the checkstyle
    
    * Fix LocalGrpcIT can not pass
    
    * Fix the UT can not pass
    
    * Simplify duplicate methods in EscapeBridge
---
 .../rocketmq/broker/failover/EscapeBridge.java     |  62 ++++++---
 .../broker/processor/AckMessageProcessor.java      |  39 ++++--
 .../processor/ChangeInvisibleTimeProcessor.java    | 149 +++++++++++++--------
 .../broker/processor/PopBufferMergeService.java    | 113 +++++++++++-----
 .../ChangeInvisibleTimeProcessorTest.java          |   3 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  20 +++
 .../proxy/service/message/LocalMessageService.java |   8 +-
 .../service/message/LocalMessageServiceTest.java   |   5 +-
 .../rocketmq/test/base/IntegrationTestBase.java    |   2 +
 9 files changed, 275 insertions(+), 126 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 762d917d64..dd37f42b2c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -137,7 +137,7 @@ public class EscapeBridge {
             brokerNameToSend = mqSelected.getBrokerName();
             if 
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
 {
                 LOG.warn("putMessageToRemoteBroker failed, remote broker not 
found. Topic: {}, MsgId: {}, Broker: {}",
-                        messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
+                    messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
                 return null;
             }
         } else {
@@ -147,7 +147,7 @@ public class EscapeBridge {
         final String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
         if (null == brokerAddrToSend) {
             LOG.warn("putMessageToRemoteBroker failed, remote broker address 
not found. Topic: {}, MsgId: {}, Broker: {}",
-                    messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
+                messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
             return null;
         }
 
@@ -197,7 +197,7 @@ public class EscapeBridge {
                     producerGroup, SEND_TIMEOUT);
 
                 return future.exceptionally(throwable -> null)
-                    .thenApplyAsync(sendResult -> 
transformSendResult2PutResult(sendResult), this.defaultAsyncSenderExecutor)
+                    .thenApplyAsync(this::transformSendResult2PutResult, 
this.defaultAsyncSenderExecutor)
                     .exceptionally(throwable -> 
transformSendResult2PutResult(null));
 
             } catch (Exception e) {
@@ -211,7 +211,6 @@ public class EscapeBridge {
         }
     }
 
-
     private String getProducerGroup(MessageExtBrokerInner messageExt) {
         if (null == messageExt) {
             return this.innerProducerGroupName;
@@ -223,12 +222,29 @@ public class EscapeBridge {
         return producerGroup;
     }
 
-
     public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner 
messageExt) {
         BrokerController masterBroker = 
this.brokerController.peekMasterBroker();
         if (masterBroker != null) {
             return masterBroker.getMessageStore().putMessage(messageExt);
-        } else if 
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
+        }
+        try {
+            return 
asyncRemotePutMessageToSpecificQueue(messageExt).get(SEND_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error("Put message to specific queue error", e);
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, 
true);
+        }
+    }
+
+    public CompletableFuture<PutMessageResult> 
asyncPutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
+        BrokerController masterBroker = 
this.brokerController.peekMasterBroker();
+        if (masterBroker != null) {
+            return masterBroker.getMessageStore().asyncPutMessage(messageExt);
+        }
+        return asyncRemotePutMessageToSpecificQueue(messageExt);
+    }
+
+    public CompletableFuture<PutMessageResult> 
asyncRemotePutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
+        if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
             && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) 
{
             try {
                 messageExt.setWaitStoreMsgOK(false);
@@ -237,7 +253,7 @@ public class EscapeBridge {
                 List<MessageQueue> mqs = 
topicPublishInfo.getMessageQueueList();
 
                 if (null == mqs || mqs.isEmpty()) {
-                    return new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+                    return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
                 }
 
                 String id = messageExt.getTopic() + messageExt.getStoreHost();
@@ -248,19 +264,17 @@ public class EscapeBridge {
 
                 String brokerNameToSend = mq.getBrokerName();
                 String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
-                final SendResult sendResult = 
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+                return 
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(
                     brokerAddrToSend, brokerNameToSend,
-                    messageExt, this.getProducerGroup(messageExt), 
SEND_TIMEOUT);
-
-                return transformSendResult2PutResult(sendResult);
+                    messageExt, this.getProducerGroup(messageExt), 
SEND_TIMEOUT).thenCompose(sendResult -> 
CompletableFuture.completedFuture(transformSendResult2PutResult(sendResult)));
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
-                return new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+                return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
             }
         } else {
             LOG.warn("Put message to specific queue failed, 
enableSlaveActingMaster={}, enableRemoteEscape={}.",
                 
this.brokerController.getBrokerConfig().isEnableSlaveActingMaster(), 
this.brokerController.getBrokerConfig().isEnableRemoteEscape());
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
         }
     }
 
@@ -282,12 +296,14 @@ public class EscapeBridge {
         }
     }
 
-    public Triple<MessageExt, String, Boolean> getMessage(String topic, long 
offset, int queueId, String brokerName, boolean deCompressBody) {
+    public Triple<MessageExt, String, Boolean> getMessage(String topic, long 
offset, int queueId, String brokerName,
+        boolean deCompressBody) {
         return getMessageAsync(topic, offset, queueId, brokerName, 
deCompressBody).join();
     }
 
     // Triple<MessageExt, info, needRetry>, check info and retry if and only 
if MessageExt is null
-    public CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageAsync(String topic, long offset, int queueId, String brokerName, 
boolean deCompressBody) {
+    public CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageAsync(String topic, long offset,
+        int queueId, String brokerName, boolean deCompressBody) {
         MessageStore messageStore = 
brokerController.getMessageStoreByBrokerName(brokerName);
         if (messageStore != null) {
             return messageStore.getMessageAsync(innerConsumerGroupName, topic, 
queueId, offset, 1, null)
@@ -300,9 +316,9 @@ public class EscapeBridge {
                     if (list == null || list.isEmpty()) {
                         // OFFSET_FOUND_NULL returned by TieredMessageStore 
indicates exception occurred
                         boolean needRetry = 
GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
-                                && messageStore instanceof TieredMessageStore;
+                            && messageStore instanceof TieredMessageStore;
                         LOG.warn("Can not get msg , topic {}, offset {}, 
queueId {}, needRetry {}, result is {}",
-                                topic, offset, queueId, needRetry, result);
+                            topic, offset, queueId, needRetry, result);
                         return Triple.of(null, "Can not get msg", needRetry);
                     }
                     return Triple.of(list.get(0), "", false);
@@ -340,12 +356,14 @@ public class EscapeBridge {
         return foundList;
     }
 
-    protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String 
topic, long offset, int queueId, String brokerName) {
+    protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String 
topic, long offset, int queueId,
+        String brokerName) {
         return getMessageFromRemoteAsync(topic, offset, queueId, 
brokerName).join();
     }
 
     // Triple<MessageExt, info, needRetry>, check info and retry if and only 
if MessageExt is null
-    protected CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageFromRemoteAsync(String topic, long offset, int queueId, String 
brokerName) {
+    protected CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageFromRemoteAsync(String topic,
+        long offset, int queueId, String brokerName) {
         try {
             String brokerAddr = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
 MixAll.MASTER_ID, false);
             if (null == brokerAddr) {
@@ -359,11 +377,11 @@ public class EscapeBridge {
             }
 
             return 
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
-                brokerAddr, this.innerConsumerGroupName, topic, queueId, 
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
+                    brokerAddr, this.innerConsumerGroupName, topic, queueId, 
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
                 .thenApply(pullResult -> {
                     if (pullResult.getLeft() != null
-                            && 
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
-                            && 
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
+                        && 
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
+                        && 
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
                         return 
Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
                     }
                     return Triple.of(null, pullResult.getMiddle(), 
pullResult.getRight());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 6f7b7e8a24..dc1b1b53a3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -98,7 +98,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
 
     @Override
     public RemotingCommand processRequest(final ChannelHandlerContext ctx,
-                                          RemotingCommand request) throws 
RemotingCommandException {
+        RemotingCommand request) throws RemotingCommandException {
         return this.processRequest(ctx.channel(), request, true);
     }
 
@@ -108,7 +108,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
     }
 
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request,
-                                           boolean brokerAllowSuspend) throws 
RemotingCommandException {
+        boolean brokerAllowSuspend) throws RemotingCommandException {
         AckMessageRequestHeader requestHeader;
         BatchAckMessageRequestBody reqBody = null;
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
@@ -126,7 +126,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
 
             if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() 
|| requestHeader.getQueueId() < 0) {
                 String errorInfo = String.format("queueId[%d] is illegal, 
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
-                        requestHeader.getQueueId(), requestHeader.getTopic(), 
topicConfig.getReadQueueNums(), channel.remoteAddress());
+                    requestHeader.getQueueId(), requestHeader.getTopic(), 
topicConfig.getReadQueueNums(), channel.remoteAddress());
                 POP_LOGGER.warn(errorInfo);
                 response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                 response.setRemark(errorInfo);
@@ -137,7 +137,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
             long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
             if (requestHeader.getOffset() < minOffset || 
requestHeader.getOffset() > maxOffset) {
                 String errorInfo = String.format("offset is illegal, 
key:%s@%d, commit:%d, store:%d~%d",
-                        requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getOffset(), minOffset, maxOffset);
+                    requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getOffset(), minOffset, maxOffset);
                 POP_LOGGER.warn(errorInfo);
                 response.setCode(ResponseCode.NO_MESSAGE);
                 response.setRemark(errorInfo);
@@ -165,7 +165,8 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private void appendAck(final AckMessageRequestHeader requestHeader, final 
BatchAck batchAck, final RemotingCommand response, final Channel channel, 
String brokerName) {
+    private void appendAck(final AckMessageRequestHeader requestHeader, final 
BatchAck batchAck,
+        final RemotingCommand response, final Channel channel, String 
brokerName) {
         String[] extraInfo;
         String consumeGroup, topic;
         int qId, rqId;
@@ -268,18 +269,36 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
         msgInner.setDeliverTimeMs(popTime + invisibleTime);
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+            int finalAckCount = ackCount;
+            
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
 -> {
+                handlePutMessageResult(putMessageResult, ackMsg, topic, 
consumeGroup, popTime, qId, finalAckCount);
+            }).exceptionally(throwable -> {
+                handlePutMessageResult(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, false),
+                    ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
+                POP_LOGGER.error("put ack msg error ", throwable);
+                return null;
+            });
+        } else {
+            PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+            handlePutMessageResult(putMessageResult, ackMsg, topic, 
consumeGroup, popTime, qId, ackCount);
+        }
+    }
+
+    private void handlePutMessageResult(PutMessageResult putMessageResult, 
AckMsg ackMsg, String topic,
+        String consumeGroup, long popTime, int qId, int ackCount) {
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
         PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
     }
 
-    protected void ackOrderly(String topic, String consumeGroup, int qId, long 
ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand 
response) {
+    protected void ackOrderly(String topic, String consumeGroup, int qId, long 
ackOffset, long popTime,
+        long invisibleTime, Channel channel, RemotingCommand response) {
         String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + 
PopAckConstants.SPLIT + qId;
         long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
         if (ackOffset < oldOffset) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index bdfffff096..af3b8ae6f0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor;
 import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.PopAckConstants;
@@ -33,13 +35,13 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
-import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -67,6 +69,35 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request,
         boolean brokerAllowSuspend) throws RemotingCommandException {
+
+        CompletableFuture<RemotingCommand> responseFuture = 
processRequestAsync(channel, request, brokerAllowSuspend);
+
+        if (brokerController.getBrokerConfig().isAppendCkAsync() && 
brokerController.getBrokerConfig().isAppendAckAsync()) {
+            responseFuture.thenAccept(response -> doResponse(channel, request, 
response)).exceptionally(throwable -> {
+                RemotingCommand response = 
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setOpaque(request.getOpaque());
+                doResponse(channel, request, response);
+                POP_LOGGER.error("append checkpoint or ack origin failed", 
throwable);
+                return null;
+            });
+        } else {
+            RemotingCommand response;
+            try {
+                response = responseFuture.get(3000, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                response = 
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setOpaque(request.getOpaque());
+                POP_LOGGER.error("append checkpoint or ack origin failed", e);
+            }
+            return response;
+        }
+        return null;
+    }
+
+    public CompletableFuture<RemotingCommand> processRequestAsync(final 
Channel channel, RemotingCommand request,
+        boolean brokerAllowSuspend) throws RemotingCommandException {
         final ChangeInvisibleTimeRequestHeader requestHeader = 
(ChangeInvisibleTimeRequestHeader) 
request.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class);
         RemotingCommand response = 
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
         response.setCode(ResponseCode.SUCCESS);
@@ -77,7 +108,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
             POP_LOGGER.error("The topic {} not exist, consumer: {} ", 
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
             response.setCode(ResponseCode.TOPIC_NOT_EXIST);
             response.setRemark(String.format("topic[%s] not exist, apply first 
please! %s", requestHeader.getTopic(), 
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
-            return response;
+            return CompletableFuture.completedFuture(response);
         }
 
         if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || 
requestHeader.getQueueId() < 0) {
@@ -86,46 +117,35 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
             POP_LOGGER.warn(errorInfo);
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
             response.setRemark(errorInfo);
-            return response;
+            return CompletableFuture.completedFuture(response);
         }
         long minOffset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
         long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
         if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() 
> maxOffset) {
             response.setCode(ResponseCode.NO_MESSAGE);
-            return response;
+            return CompletableFuture.completedFuture(response);
         }
 
         String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
 
         if (ExtraInfoUtil.isOrder(extraInfo)) {
-            return processChangeInvisibleTimeForOrder(requestHeader, 
extraInfo, response, responseHeader);
+            return 
CompletableFuture.completedFuture(processChangeInvisibleTimeForOrder(requestHeader,
 extraInfo, response, responseHeader));
         }
 
         // add new ck
         long now = System.currentTimeMillis();
-        PutMessageResult ckResult = appendCheckPoint(requestHeader, 
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), 
requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));
-
-        if (ckResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
-            && ckResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
-            && ckResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
-            && ckResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
-            POP_LOGGER.error("change Invisible, put new ck error: {}", 
ckResult);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            return response;
-        }
-
-        // ack old msg.
-        try {
-            ackOrigin(requestHeader, extraInfo);
-        } catch (Throwable e) {
-            POP_LOGGER.error("change Invisible, put ack msg error: {}, {}", 
requestHeader.getExtraInfo(), e.getMessage());
-            // cancel new ck?
-        }
 
-        responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
-        responseHeader.setPopTime(now);
-        responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
-        return response;
+        CompletableFuture<Boolean> futureResult = 
appendCheckPointThenAckOrigin(requestHeader, 
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), 
requestHeader.getOffset(), now, extraInfo);
+        return futureResult.thenCompose(result -> {
+            if (result) {
+                
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
+                responseHeader.setPopTime(now);
+                
responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+            }
+            return CompletableFuture.completedFuture(response);
+        });
     }
 
     protected RemotingCommand 
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader 
requestHeader,
@@ -158,7 +178,8 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private void ackOrigin(final ChangeInvisibleTimeRequestHeader 
requestHeader, String[] extraInfo) {
+    private CompletableFuture<Boolean> ackOrigin(final 
ChangeInvisibleTimeRequestHeader requestHeader,
+        String[] extraInfo) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         AckMsg ackMsg = new AckMsg();
 
@@ -176,7 +197,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
 
         if 
(brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId,
 ackMsg)) {
-            return;
+            return CompletableFuture.completedFuture(true);
         }
 
         msgInner.setTopic(reviveTopic);
@@ -189,18 +210,25 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + 
ExtraInfoUtil.getInvisibleTime(extraInfo));
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
-        if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
-            POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}", 
ackMsg, putMessageResult);
-        }
-        PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
+        return 
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult
 -> {
+            if (putMessageResult.getPutMessageStatus() != 
PutMessageStatus.PUT_OK
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+                POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}", 
ackMsg, putMessageResult);
+            }
+            PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
+            return CompletableFuture.completedFuture(true);
+        }).exceptionally(e -> {
+            POP_LOGGER.error("change Invisible, put ack msg error: {}, {}", 
requestHeader.getExtraInfo(), e.getMessage());
+            return false;
+        });
     }
 
-    private PutMessageResult appendCheckPoint(final 
ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid,
-        int queueId, long offset, long popTime, String brokerName) {
+    private CompletableFuture<Boolean> appendCheckPointThenAckOrigin(
+        final ChangeInvisibleTimeRequestHeader requestHeader,
+        int reviveQid,
+        int queueId, long offset, long popTime, String[] extraInfo) {
         // add check point msg to revive log
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(reviveTopic);
@@ -214,7 +242,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         ck.setTopic(requestHeader.getTopic());
         ck.setQueueId(queueId);
         ck.addDiff(0);
-        ck.setBrokerName(brokerName);
+        ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
 
         
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
         msgInner.setQueueId(reviveQid);
@@ -225,21 +253,36 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         msgInner.setDeliverTimeMs(ck.getReviveTime() - 
PopAckConstants.ackTimeInterval);
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genCkUniqueId(ck));
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
-
-        if (brokerController.getBrokerConfig().isEnablePopLog()) {
-            POP_LOGGER.info("change Invisible , appendCheckPoint, topic {}, 
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", 
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), 
offset,
-                ck.getReviveTime(), putMessageResult);
-        }
+        return 
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult
 -> {
+            if (brokerController.getBrokerConfig().isEnablePopLog()) {
+                POP_LOGGER.info("change Invisible, appendCheckPoint, topic {}, 
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", 
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), 
offset,
+                    ck.getReviveTime(), putMessageResult);
+            }
 
-        if (putMessageResult != null) {
-            PopMetricsManager.incPopReviveCkPutCount(ck, 
putMessageResult.getPutMessageStatus());
-            if (putMessageResult.isOk()) {
-                
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
-                
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
+            if (putMessageResult != null) {
+                PopMetricsManager.incPopReviveCkPutCount(ck, 
putMessageResult.getPutMessageStatus());
+                if (putMessageResult.isOk()) {
+                    
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
+                    
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
+                }
             }
-        }
+            if (putMessageResult.getPutMessageStatus() != 
PutMessageStatus.PUT_OK
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+                POP_LOGGER.error("change invisible, put new ck error: {}", 
putMessageResult);
+                return CompletableFuture.completedFuture(false);
+            } else {
+                return ackOrigin(requestHeader, extraInfo);
+            }
+        }).exceptionally(throwable -> {
+            POP_LOGGER.error("change invisible, put new ck error", throwable);
+            return null;
+        });
+    }
 
-        return putMessageResult;
+    protected void doResponse(Channel channel, RemotingCommand request,
+        final RemotingCommand response) {
+        NettyRemotingAbstract.writeResponse(channel, request, response);
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 8a85dd8fec..e05ab8ebea 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -216,7 +216,8 @@ public class PopBufferMergeService extends ServiceThread {
 
     private void scan() {
         long startTime = System.currentTimeMillis();
-        int count = 0, countCk = 0;
+        AtomicInteger count = new AtomicInteger(0);
+        int countCk = 0;
         Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = 
buffer.entrySet().iterator();
         while (iterator.hasNext()) {
             Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
@@ -257,14 +258,14 @@ public class PopBufferMergeService extends ServiceThread {
             } else if (pointWrapper.isJustOffset()) {
                 // just offset should be in store.
                 if (pointWrapper.getReviveQueueOffset() < 0) {
-                    putCkToStore(pointWrapper, false);
+                    putCkToStore(pointWrapper, 
this.brokerController.getBrokerConfig().isAppendCkAsync());
                     countCk++;
                 }
                 continue;
             } else if (removeCk) {
                 // put buffer ak to store
                 if (pointWrapper.getReviveQueueOffset() < 0) {
-                    putCkToStore(pointWrapper, false);
+                    putCkToStore(pointWrapper, 
this.brokerController.getBrokerConfig().isAppendCkAsync());
                     countCk++;
                 }
 
@@ -278,17 +279,12 @@ public class PopBufferMergeService extends ServiceThread {
                         for (byte i = 0; i < point.getNum(); i++) {
                             // reput buffer ak to store
                             if 
(DataConverter.getBit(pointWrapper.getBits().get(), i)
-                                    && 
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+                                && 
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
                                 indexList.add(i);
                             }
                         }
                         if (indexList.size() > 0) {
-                            if (putBatchAckToStore(pointWrapper, indexList)) {
-                                count += indexList.size();
-                                for (Byte i : indexList) {
-                                    markBitCAS(pointWrapper.getToStoreBits(), 
i);
-                                }
-                            }
+                            putBatchAckToStore(pointWrapper, indexList, count);
                         }
                     } finally {
                         indexList.clear();
@@ -297,11 +293,8 @@ public class PopBufferMergeService extends ServiceThread {
                     for (byte i = 0; i < point.getNum(); i++) {
                         // reput buffer ak to store
                         if (DataConverter.getBit(pointWrapper.getBits().get(), 
i)
-                                && 
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
-                            if (putAckToStore(pointWrapper, i)) {
-                                count++;
-                                markBitCAS(pointWrapper.getToStoreBits(), i);
-                            }
+                            && 
!DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+                            putAckToStore(pointWrapper, i, count);
                         }
                     }
                 }
@@ -312,7 +305,6 @@ public class PopBufferMergeService extends ServiceThread {
                     }
                     iterator.remove();
                     counter.decrementAndGet();
-                    continue;
                 }
             }
         }
@@ -323,13 +315,13 @@ public class PopBufferMergeService extends ServiceThread {
         if (eclipse > 
brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
             POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, 
PopBufferEclipse={}, " +
                     "PopBufferToStoreAck={}, PopBufferToStoreCk={}, 
PopBufferSize={}, PopBufferOffsetSize={}",
-                eclipse, count, countCk, counter.get(), offsetBufferSize);
+                eclipse, count.get(), countCk, counter.get(), 
offsetBufferSize);
             this.serving = false;
         } else {
             if (scanTimes % countOfSecond1 == 0) {
                 POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +
                         "PopBufferToStoreAck={}, PopBufferToStoreCk={}, 
PopBufferSize={}, PopBufferOffsetSize={}",
-                    eclipse, count, countCk, counter.get(), offsetBufferSize);
+                    eclipse, count.get(), countCk, counter.get(), 
offsetBufferSize);
             }
         }
         PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
@@ -429,7 +421,8 @@ public class PopBufferMergeService extends ServiceThread {
      * @param nextBeginOffset
      * @return
      */
-    public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, 
long reviveQueueOffset, long nextBeginOffset) {
+    public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, 
long reviveQueueOffset,
+        long nextBeginOffset) {
         PopCheckPointWrapper pointWrapper = new 
PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, 
true);
 
         if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
@@ -439,7 +432,7 @@ public class PopBufferMergeService extends ServiceThread {
             return false;
         }
 
-        this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
+        this.putCkToStore(pointWrapper, checkQueueOk(pointWrapper));
 
         putOffsetQueue(pointWrapper);
         this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
@@ -447,7 +440,7 @@ public class PopBufferMergeService extends ServiceThread {
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
         }
-        return  true;
+        return true;
     }
 
     public void addCkMock(String group, String topic, int queueId, long 
startOffset, long invisibleTime,
@@ -597,13 +590,32 @@ public class PopBufferMergeService extends ServiceThread {
         if (pointWrapper.getReviveQueueOffset() >= 0) {
             return;
         }
+
         MessageExtBrokerInner msgInner = 
popMessageProcessor.buildCkMsg(pointWrapper.getCk(), 
pointWrapper.getReviveQueueId());
-        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+        // Indicates that ck message is storing
+        pointWrapper.setReviveQueueOffset(Long.MAX_VALUE);
+        if (brokerController.getBrokerConfig().isAppendCkAsync() && 
runInCurrent) {
+            
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
 -> {
+                handleCkMessagePutResult(putMessageResult, pointWrapper);
+            }).exceptionally(throwable -> {
+                POP_LOGGER.error("[PopBuffer]put ck to store fail: {}", 
pointWrapper, throwable);
+                pointWrapper.setReviveQueueOffset(-1);
+                return null;
+            });
+        } else {
+            PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+            handleCkMessagePutResult(putMessageResult, pointWrapper);
+        }
+    }
+
+    private void handleCkMessagePutResult(PutMessageResult putMessageResult, 
final PopCheckPointWrapper pointWrapper) {
         PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(), 
putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+            pointWrapper.setReviveQueueOffset(-1);
             POP_LOGGER.error("[PopBuffer]put ck to store fail: {}, {}", 
pointWrapper, putMessageResult);
             return;
         }
@@ -621,7 +633,7 @@ public class PopBufferMergeService extends ServiceThread {
         }
     }
 
-    private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, 
byte msgIndex) {
+    private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte 
msgIndex, AtomicInteger count) {
         PopCheckPoint point = pointWrapper.getCk();
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         final AckMsg ackMsg = new AckMsg();
@@ -643,23 +655,39 @@ public class PopBufferMergeService extends ServiceThread {
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
 
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+        if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+            
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
 -> {
+                handleAckPutMessageResult(ackMsg, putMessageResult, 
pointWrapper, count, msgIndex);
+            }).exceptionally(throwable -> {
+                POP_LOGGER.error("[PopBuffer]put ack to store fail: {}, {}", 
pointWrapper, ackMsg, throwable);
+                return null;
+            });
+        } else {
+            PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+            handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, 
count, msgIndex);
+        }
+    }
+
+    private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult 
putMessageResult,
+        PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex) 
{
         PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("[PopBuffer]put ack to store fail: {}, {}, {}", 
pointWrapper, ackMsg, putMessageResult);
-            return false;
+            return;
         }
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("[PopBuffer]put ack to store ok: {}, {}, {}", 
pointWrapper, ackMsg, putMessageResult);
         }
-
-        return true;
+        count.incrementAndGet();
+        markBitCAS(pointWrapper.getToStoreBits(), msgIndex);
     }
 
-    private boolean putBatchAckToStore(final PopCheckPointWrapper 
pointWrapper, final List<Byte> msgIndexList) {
+    private void putBatchAckToStore(final PopCheckPointWrapper pointWrapper, 
final List<Byte> msgIndexList,
+        AtomicInteger count) {
         PopCheckPoint point = pointWrapper.getCk();
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         final BatchAckMsg batchAckMsg = new BatchAckMsg();
@@ -683,19 +711,36 @@ public class PopBufferMergeService extends ServiceThread {
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));
 
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+            
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult
 -> {
+                handleBatchAckPutMessageResult(batchAckMsg, putMessageResult, 
pointWrapper, count, msgIndexList);
+            }).exceptionally(throwable -> {
+                POP_LOGGER.error("[PopBuffer]put batchAckMsg to store fail: 
{}, {}", pointWrapper, batchAckMsg, throwable);
+                return null;
+            });
+        } else {
+            PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+            handleBatchAckPutMessageResult(batchAckMsg, putMessageResult, 
pointWrapper, count, msgIndexList);
+        }
+    }
+
+    private void handleBatchAckPutMessageResult(BatchAckMsg batchAckMsg, 
PutMessageResult putMessageResult,
+        PopCheckPointWrapper pointWrapper, AtomicInteger count, List<Byte> 
msgIndexList) {
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
-                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {}, 
{}", pointWrapper, batchAckMsg, putMessageResult);
-            return false;
+            return;
         }
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {}, 
{}", pointWrapper, batchAckMsg, putMessageResult);
         }
 
-        return true;
+        count.addAndGet(msgIndexList.size());
+        for (Byte i : msgIndexList) {
+            markBitCAS(pointWrapper.getToStoreBits(), i);
+        }
     }
 
     private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index ee11f046d0..a7aae7ee3d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
@@ -108,7 +109,7 @@ public class ChangeInvisibleTimeProcessorTest {
 
     @Test
     public void testProcessRequest_Success() throws RemotingCommandException, 
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
-        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
         int queueId = 0;
         long queueOffset = 0;
         long popTime = System.currentTimeMillis() - 1_000;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2123e9b339..2acfdd69a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -427,6 +427,10 @@ public class BrokerConfig extends BrokerIdentity {
     // if false, will still rewrite ck after max times 17
     private boolean skipWhenCKRePutReachMaxTimes = false;
 
+    private boolean appendAckAsync = false;
+
+    private boolean appendCkAsync = false;
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -1859,4 +1863,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setUpdateNameServerAddrPeriod(int updateNameServerAddrPeriod) {
         this.updateNameServerAddrPeriod = updateNameServerAddrPeriod;
     }
+
+    public boolean isAppendAckAsync() {
+        return appendAckAsync;
+    }
+
+    public void setAppendAckAsync(boolean appendAckAsync) {
+        this.appendAckAsync = appendAckAsync;
+    }
+
+    public boolean isAppendCkAsync() {
+        return appendCkAsync;
+    }
+
+    public void setAppendCkAsync(boolean appendCkAsync) {
+        this.appendCkAsync = appendCkAsync;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 6b2ba02f7c..a8088a95d0 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -176,7 +176,8 @@ public class LocalMessageService implements MessageService {
     }
 
     @Override
-    public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, 
String brokerName, EndTransactionRequestHeader requestHeader,
+    public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, 
String brokerName,
+        EndTransactionRequestHeader requestHeader,
         long timeoutMillis) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         SimpleChannel channel = channelManager.createChannel(ctx);
@@ -310,9 +311,8 @@ public class LocalMessageService implements MessageService {
         RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
 requestHeader, ctx.getLanguage());
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            RemotingCommand response = 
brokerController.getChangeInvisibleTimeProcessor()
-                .processRequest(channelHandlerContext, command);
-            future.complete(response);
+            future = brokerController.getChangeInvisibleTimeProcessor()
+                .processRequestAsync(channelHandlerContext.channel(), command, 
true);
         } catch (Exception e) {
             log.error("Fail to process changeInvisibleTime command", e);
             future.completeExceptionally(e);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index 3e3d37086b..f7a656d768 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.proxy.service.message;
 
+import io.netty.channel.Channel;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -370,11 +371,11 @@ public class LocalMessageServiceTest extends 
InitConfigTest {
         responseHeader.setReviveQid(newReviveQueueId);
         responseHeader.setInvisibleTime(newInvisibleTime);
         responseHeader.setPopTime(newPopTime);
-        
Mockito.when(changeInvisibleTimeProcessorMock.processRequest(Mockito.any(SimpleChannelHandlerContext.class),
 Mockito.argThat(argument -> {
+        
Mockito.when(changeInvisibleTimeProcessorMock.processRequestAsync(Mockito.any(Channel.class),
 Mockito.argThat(argument -> {
             boolean first = argument.getCode() == 
RequestCode.CHANGE_MESSAGE_INVISIBLETIME;
             boolean second = argument.readCustomHeader() instanceof 
ChangeInvisibleTimeRequestHeader;
             return first && second;
-        }))).thenReturn(remotingCommand);
+        }), 
Mockito.any(Boolean.class))).thenReturn(CompletableFuture.completedFuture(remotingCommand));
         ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
         CompletableFuture<AckResult> future = 
localMessageService.changeInvisibleTime(proxyContext, handle, messageId,
             requestHeader, 1000L);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 2217936929..fde991ad13 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -136,6 +136,8 @@ public class IntegrationTestBase {
         brokerConfig.setNamesrvAddr(nsAddr);
         brokerConfig.setEnablePropertyFilter(true);
         brokerConfig.setEnableCalcFilterBitMap(true);
+        brokerConfig.setAppendAckAsync(true);
+        brokerConfig.setAppendCkAsync(true);
         storeConfig.setEnableConsumeQueueExt(true);
         brokerConfig.setLoadBalancePollNameServerInterval(500);
         storeConfig.setStorePathRootDir(baseDir);

Reply via email to