iamzhoug37 closed pull request #280: [ISSUE #249] master-slave sync model 
performance improve
URL: https://github.com/apache/rocketmq/pull/280
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index f6f8a80af..893f7fe62 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -27,7 +27,7 @@
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
@@ -81,8 +81,8 @@ public void destroy() {
     }
 
     @Override
-    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        return next.putMessage(msg);
+    public void putMessage(MessageExtBrokerInner msg , PutMessageCallback 
putMessageCallback) {
+        next.putMessage(msg , putMessageCallback);
     }
 
     @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d69a78700..543a450af 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -106,6 +106,7 @@
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.ConsumeQueue;
@@ -207,6 +208,12 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx,
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
@@ -1356,4 +1363,4 @@ private RemotingCommand 
queryConsumeQueue(ChannelHandlerContext ctx,
 
         return response;
     }
-}
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 67807a863..15b6f47a6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -38,6 +38,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +67,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index bb427050d..e0fcdb7bd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -33,6 +33,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index fee1420a9..9a8034352 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -30,9 +30,11 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,7 +146,15 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx,
             }
 
             final MessageStore messageStore = 
this.brokerController.getMessageStore();
-            final PutMessageResult putMessageResult = 
messageStore.putMessage(msgInner);
+            PutMessageCallback putMessageCallback = new PutMessageCallback() ;
+            messageStore.putMessage(msgInner , putMessageCallback);
+            try {
+                putMessageCallback.waitComplete();
+            }
+            catch (InterruptedException e) {
+                //ignore
+            }
+            final PutMessageResult putMessageResult = 
putMessageCallback.getPutMessageResult() ;
             if (putMessageResult != null) {
                 switch (putMessageResult.getPutMessageStatus()) {
                     // Success
@@ -197,6 +207,12 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx,
         return response;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
index 199aa940d..f58ad0c18 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -20,6 +20,7 @@
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index a46cbff2e..a709f2d01 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -56,11 +56,12 @@
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
-import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.slf4j.Logger;
@@ -81,6 +82,12 @@ public RemotingCommand processRequest(final 
ChannelHandlerContext ctx,
         return this.processRequest(ctx.channel(), request, true);
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
@@ -524,7 +531,7 @@ private void generateOffsetMovedEvent(final 
OffsetMovedEvent event) {
 
             msgInner.setReconsumeTimes(0);
 
-            PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
+            this.brokerController.getMessageStore().putMessage(msgInner , new 
PutMessageCallback());
         } catch (Exception e) {
             log.warn(String.format("generateOffsetMovedEvent Exception, %s", 
event.toString()), e);
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index e8f97d0af..ae204be28 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -32,6 +32,7 @@
 import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
new file mode 100644
index 000000000..787e28221
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * SendMessageCallback is a asynchronous callback for processor process the 
response of the produce request
+ */
+public interface SendMessageCallback {
+    void callback(RemotingCommand response);
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 227a23e6b..cb963871a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -45,8 +45,10 @@
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -60,31 +62,43 @@ public SendMessageProcessor(final BrokerController 
brokerController) {
     }
 
     @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
-        SendMessageContext mqtraceContext;
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, final RemoteCommandResponseCallback remoteCommandResponseCallback) 
throws Exception {
+
+        final SendMessageContext mqtraceContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
-                return this.consumerSendMsgBack(ctx, request);
+                this.consumerSendMsgBack(ctx, request , 
remoteCommandResponseCallback);
+                return ;
             default:
                 SendMessageRequestHeader requestHeader = 
parseRequestHeader(request);
                 if (requestHeader == null) {
-                    return null;
+                    remoteCommandResponseCallback.callback(null);
+                    return;
                 }
 
                 mqtraceContext = buildMsgContext(ctx, requestHeader);
                 this.executeSendMessageHookBefore(ctx, request, 
mqtraceContext);
 
-                RemotingCommand response;
+                SendMessageCallback sendMessageCallback = new 
SendMessageCallback() {
+                    @Override
+                    public void callback(RemotingCommand response) {
+                        executeSendMessageHookAfter(response, mqtraceContext);
+                        remoteCommandResponseCallback.callback(response);
+                    }
+                } ;
                 if (requestHeader.isBatch()) {
-                    response = this.sendBatchMessage(ctx, request, 
mqtraceContext, requestHeader);
+                    this.sendBatchMessage(ctx, request, mqtraceContext, 
requestHeader , sendMessageCallback);
                 } else {
-                    response = this.sendMessage(ctx, request, mqtraceContext, 
requestHeader);
+                    this.sendMessage(ctx, request, mqtraceContext, 
requestHeader , sendMessageCallback);
                 }
-
-                this.executeSendMessageHookAfter(response, mqtraceContext);
-                return response;
         }
+
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        throw new RemotingCommandException("sendMessageProcessor not support 
processRequest" , new UnsupportedOperationException()) ;
     }
 
     @Override
@@ -93,7 +107,7 @@ public boolean rejectRequest() {
             
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
     }
 
-    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext 
ctx, final RemotingCommand request)
+    private void consumerSendMsgBack(final ChannelHandlerContext ctx, final 
RemotingCommand request , final RemoteCommandResponseCallback 
remoteCommandResponseCallback)
         throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         final ConsumerSendMsgBackRequestHeader requestHeader =
@@ -117,19 +131,22 @@ private RemotingCommand consumerSendMsgBack(final 
ChannelHandlerContext ctx, fin
             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
             response.setRemark("subscription group not exist, " + 
requestHeader.getGroup() + " "
                 + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
         if 
(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()))
 {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the broker[" + 
this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is 
forbidden");
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
         if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
         String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
@@ -147,20 +164,23 @@ private RemotingCommand consumerSendMsgBack(final 
ChannelHandlerContext ctx, fin
         if (null == topicConfig) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("topic[" + newTopic + "] not exist");
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
         if (!PermName.isWriteable(topicConfig.getPerm())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(String.format("the topic[%s] sending message is 
forbidden", newTopic));
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
-        MessageExt msgExt = 
this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
+        final MessageExt msgExt = 
this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
         if (null == msgExt) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("look message by offset failed, " + 
requestHeader.getOffset());
-            return response;
+            remoteCommandResponseCallback.callback(response);
+            return ;
         }
 
         final String retryTopic = 
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
@@ -188,7 +208,8 @@ private RemotingCommand consumerSendMsgBack(final 
ChannelHandlerContext ctx, fin
             if (null == topicConfig) {
                 response.setCode(ResponseCode.SYSTEM_ERROR);
                 response.setRemark("topic[" + newTopic + "] not exist");
-                return response;
+                remoteCommandResponseCallback.callback(response);
+                return ;
             }
         } else {
             if (0 == delayLevel) {
@@ -216,34 +237,40 @@ private RemotingCommand consumerSendMsgBack(final 
ChannelHandlerContext ctx, fin
         String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
         MessageAccessor.setOriginMessageId(msgInner, 
UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
 
-        PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
-        if (putMessageResult != null) {
-            switch (putMessageResult.getPutMessageStatus()) {
-                case PUT_OK:
-                    String backTopic = msgExt.getTopic();
-                    String correctTopic = 
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
-                    if (correctTopic != null) {
-                        backTopic = correctTopic;
+        this.brokerController.getMessageStore().putMessage(msgInner, new 
PutMessageCallback() {
+            @Override
+            public void doAction(PutMessageResult putMessageResult) {
+                if (putMessageResult != null) {
+                    switch (putMessageResult.getPutMessageStatus()) {
+                        case PUT_OK:
+                            String backTopic = msgExt.getTopic();
+                            String correctTopic = 
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+                            if (correctTopic != null) {
+                                backTopic = correctTopic;
+                            }
+
+                            
brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(),
 backTopic);
+
+                            response.setCode(ResponseCode.SUCCESS);
+                            response.setRemark(null);
+
+                            remoteCommandResponseCallback.callback(response);
+                            return ;
+                        default:
+                            break;
                     }
 
-                    
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(),
 backTopic);
-
-                    response.setCode(ResponseCode.SUCCESS);
-                    response.setRemark(null);
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    
response.setRemark(putMessageResult.getPutMessageStatus().name());
+                    remoteCommandResponseCallback.callback(response);
+                    return ;
+                }
 
-                    return response;
-                default:
-                    break;
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("putMessageResult is null");
+                remoteCommandResponseCallback.callback(response);
             }
-
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(putMessageResult.getPutMessageStatus().name());
-            return response;
-        }
-
-        response.setCode(ResponseCode.SYSTEM_ERROR);
-        response.setRemark("putMessageResult is null");
-        return response;
+        });
     }
 
     private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, 
RemotingCommand response,
@@ -290,10 +317,11 @@ private boolean 
handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
         return true;
     }
 
-    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+    private void sendMessage(final ChannelHandlerContext ctx,
         final RemotingCommand request,
         final SendMessageContext sendMessageContext,
-        final SendMessageRequestHeader requestHeader) throws 
RemotingCommandException {
+        final SendMessageRequestHeader requestHeader,
+        final SendMessageCallback sendMessageCallback) throws 
RemotingCommandException {
 
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
@@ -309,13 +337,15 @@ private RemotingCommand sendMessage(final 
ChannelHandlerContext ctx,
         if (this.brokerController.getMessageStore().now() < startTimstamp) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(String.format("broker unable to service, until 
%s", UtilAll.timeMillisToHumanString2(startTimstamp)));
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         response.setCode(-1);
         super.msgCheck(ctx, requestHeader, response);
         if (response.getCode() != -1) {
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         final byte[] body = request.getBody();
@@ -327,12 +357,13 @@ private RemotingCommand sendMessage(final 
ChannelHandlerContext ctx,
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 
topicConfig.getWriteQueueNums();
         }
 
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        final MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(requestHeader.getTopic());
         msgInner.setQueueId(queueIdInt);
 
         if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, 
topicConfig)) {
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         msgInner.setBody(body);
@@ -350,14 +381,20 @@ private RemotingCommand sendMessage(final 
ChannelHandlerContext ctx,
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
                     "the broker[" + 
this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction 
message is forbidden");
-                return response;
+                sendMessageCallback.callback(response);
+                return ;
             }
         }
 
-        PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
-
-        return handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
-
+        final int innerQueueIdInt = queueIdInt ;
+        PutMessageCallback putMessageCallback = new PutMessageCallback() {
+            @Override
+            public void doAction(PutMessageResult putMessageResult) {
+                RemotingCommand remotingCommand = 
handlePutMessageResult(putMessageResult, response, request, msgInner, 
responseHeader, sendMessageContext, ctx, innerQueueIdInt);
+                sendMessageCallback.callback(remotingCommand);
+            }
+        } ;
+        this.brokerController.getMessageStore().putMessage(msgInner , 
putMessageCallback);
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult 
putMessageResult, RemotingCommand response,
@@ -465,10 +502,11 @@ private RemotingCommand 
handlePutMessageResult(PutMessageResult putMessageResult
         return response;
     }
 
-    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+    private void sendBatchMessage(final ChannelHandlerContext ctx,
         final RemotingCommand request,
         final SendMessageContext sendMessageContext,
-        final SendMessageRequestHeader requestHeader) throws 
RemotingCommandException {
+        final SendMessageRequestHeader requestHeader ,
+        final SendMessageCallback sendMessageCallback) throws 
RemotingCommandException {
 
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
@@ -484,13 +522,15 @@ private RemotingCommand sendBatchMessage(final 
ChannelHandlerContext ctx,
         if (this.brokerController.getMessageStore().now() < startTimstamp) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(String.format("broker unable to service, until 
%s", UtilAll.timeMillisToHumanString2(startTimstamp)));
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         response.setCode(-1);
         super.msgCheck(ctx, requestHeader, response);
         if (response.getCode() != -1) {
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         int queueIdInt = requestHeader.getQueueId();
@@ -503,15 +543,17 @@ private RemotingCommand sendBatchMessage(final 
ChannelHandlerContext ctx,
         if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
             response.setRemark("message topic length too long " + 
requestHeader.getTopic().length());
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
 
         if (requestHeader.getTopic() != null && 
requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
             response.setRemark("batch request does not support retry group " + 
requestHeader.getTopic());
-            return response;
+            sendMessageCallback.callback(response);
+            return ;
         }
-        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        final MessageExtBatch messageExtBatch = new MessageExtBatch();
         messageExtBatch.setTopic(requestHeader.getTopic());
         messageExtBatch.setQueueId(queueIdInt);
 
@@ -529,9 +571,15 @@ private RemotingCommand sendBatchMessage(final 
ChannelHandlerContext ctx,
         messageExtBatch.setStoreHost(this.getStoreHost());
         messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == 
null ? 0 : requestHeader.getReconsumeTimes());
 
-        PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessages(messageExtBatch);
-
-        return handlePutMessageResult(putMessageResult, response, request, 
messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
+        final int innerQueueIdInt = queueIdInt ;
+        PutMessageCallback putMessageCallback = new PutMessageCallback() {
+            @Override
+            public void doAction(PutMessageResult putMessageResult) {
+                RemotingCommand remotingCommand = 
handlePutMessageResult(putMessageResult, response, request, messageExtBatch, 
responseHeader, sendMessageContext, ctx, innerQueueIdInt);
+                sendMessageCallback.callback(remotingCommand);
+            }
+        } ;
+        this.brokerController.getMessageStore().putMessages(messageExtBatch , 
putMessageCallback);
     }
 
     public boolean hasConsumeMessageHook() {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index e544d90a1..9e993f9e4 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -32,6 +32,7 @@
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -184,7 +185,10 @@ public void dispatch(DispatchRequest request) {
                 msg.putUserProperty("a", String.valueOf(j * 10 + 5));
                 
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 
-                PutMessageResult result = master.putMessage(msg);
+                PutMessageCallback putMessageCallback = new 
PutMessageCallback() ;
+                master.putMessage(msg , putMessageCallback);
+                putMessageCallback.waitComplete();
+                PutMessageResult result = 
putMessageCallback.getPutMessageResult() ;
 
                 msg.setMsgId(result.getAppendMessageResult().getMsgId());
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 7828e7a91..e522d7a3d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -39,6 +39,7 @@
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -84,13 +85,13 @@ public void init() {
 
     @Test
     public void testProcessRequest() throws RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         assertPutResult(ResponseCode.SUCCESS);
     }
 
     @Test
     public void testProcessRequest_WithHook() throws RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         List<SendMessageHook> sendMessageHookList = new ArrayList<>();
         final SendMessageContext[] sendMessageContext = new 
SendMessageContext[1];
         SendMessageHook sendMessageHook = new SendMessageHook() {
@@ -120,55 +121,55 @@ public void sendMessageAfter(SendMessageContext context) {
 
     @Test
     public void testProcessRequest_FlushTimeOut() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT);
     }
 
     @Test
     public void testProcessRequest_MessageIllegal() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
     }
 
     @Test
     public void testProcessRequest_CreateMappedFileFailed() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.SYSTEM_ERROR);
     }
 
     @Test
     public void testProcessRequest_FlushSlaveTimeout() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT);
     }
 
     @Test
     public void testProcessRequest_PageCacheBusy() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.SYSTEM_ERROR);
     }
 
     @Test
     public void testProcessRequest_PropertiesTooLong() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
     }
 
     @Test
     public void testProcessRequest_ServiceNotAvailable() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE);
     }
 
     @Test
     public void testProcessRequest_SlaveNotAvailable() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
         assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE);
     }
 
     @Test
     public void testProcessRequest_WithMsgBack() throws 
RemotingCommandException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        when(putMessage()).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         final RemotingCommand request = 
createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
 
         sendMessageProcessor = new SendMessageProcessor(brokerController);
@@ -177,6 +178,17 @@ public void testProcessRequest_WithMsgBack() throws 
RemotingCommandException {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+    private PutMessageResult putMessage(){
+        PutMessageCallback putMessageCallback = new PutMessageCallback();
+        messageStore.putMessage(any(MessageExtBrokerInner.class) , 
putMessageCallback) ;
+        try{
+            putMessageCallback.waitComplete();
+        }
+        catch (InterruptedException e){
+            e.printStackTrace();
+        }
+        return putMessageCallback.getPutMessageResult() ;
+    }
     private RemotingCommand createSendMsgCommand(int requestCode) {
         SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
         requestHeader.setProducerGroup(group);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 69478cf32..f168c7401 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -43,6 +43,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 
@@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx,
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
 
b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
index e459b1aeb..0d9701c79 100644
--- 
a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
+++ 
b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.CommitLog;
 import org.slf4j.Logger;
@@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index f6611b683..2dd363c79 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
@@ -83,4 +84,10 @@ public RemotingCommand 
getRouteInfoByTopic(ChannelHandlerContext ctx,
             + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
         return response;
     }
+
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index ed5b20b16..8ec562a64 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -49,6 +49,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,6 +121,12 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx,
         return null;
     }
 
+    @Override
+    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws 
Exception {
+        RemotingCommand remotingCommand = processRequest(ctx , request) ;
+        remoteCommandResponseCallback.callback(remotingCommand);
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 76752529a..7f6253731 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 import org.slf4j.Logger;
@@ -170,31 +171,36 @@ public void processRequestCommand(final 
ChannelHandlerContext ctx, final Remotin
                 @Override
                 public void run() {
                     try {
-                        RPCHook rpcHook = 
NettyRemotingAbstract.this.getRPCHook();
+                        final RPCHook rpcHook = 
NettyRemotingAbstract.this.getRPCHook();
                         if (rpcHook != null) {
                             
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
cmd);
                         }
 
-                        final RemotingCommand response = 
pair.getObject1().processRequest(ctx, cmd);
-                        if (rpcHook != null) {
-                            
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
cmd, response);
-                        }
-
-                        if (!cmd.isOnewayRPC()) {
-                            if (response != null) {
-                                response.setOpaque(opaque);
-                                response.markResponseType();
-                                try {
-                                    ctx.writeAndFlush(response);
-                                } catch (Throwable e) {
-                                    log.error("process request over, but 
response failed", e);
-                                    log.error(cmd.toString());
-                                    log.error(response.toString());
+                        final RemoteCommandResponseCallback responseCallback = 
new RemoteCommandResponseCallback() {
+                            @Override
+                            public void callback(RemotingCommand response) {
+                                if (rpcHook != null) {
+                                    
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
cmd, response);
                                 }
-                            } else {
 
+                                if (!cmd.isOnewayRPC()) {
+                                    if (response != null) {
+                                        response.setOpaque(opaque);
+                                        response.markResponseType();
+                                        try {
+                                            ctx.writeAndFlush(response);
+                                        } catch (Throwable e) {
+                                            log.error("process request over, 
but response failed", e);
+                                            log.error(cmd.toString());
+                                            log.error(response.toString());
+                                        }
+                                    } else {
+
+                                    }
+                                }
                             }
-                        }
+                        } ;
+                        pair.getObject1().asyncProcessRequest(ctx, cmd , 
responseCallback);
                     } catch (Throwable e) {
                         log.error("process request exception", e);
                         log.error(cmd.toString());
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
index 040f76848..a6f05e93f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 /**
@@ -26,5 +27,8 @@
     RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand 
request)
         throws Exception;
 
+    void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand 
request , RemoteCommandResponseCallback remoteCommandResponseCallback)
+            throws Exception;
+
     boolean rejectRequest();
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
new file mode 100644
index 000000000..c13d64ca0
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+/**
+ * RemoteCommandResponseCallback is a asynchronous callback for netty network 
io
+ */
+public interface RemoteCommandResponseCallback {
+    void callback(RemotingCommand response);
+}
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java 
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 0ecfaaa5a..461496691 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -33,6 +33,7 @@
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -55,6 +56,11 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
                 return request;
             }
 
+            @Override
+            public void asyncProcessRequest(ChannelHandlerContext ctx, 
RemotingCommand request, RemoteCommandResponseCallback 
remoteCommandResponseCallback) throws Exception {
+                throw new UnsupportedOperationException() ;
+            }
+
             @Override
             public boolean rejectRequest() {
                 return false;
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 03d98d319..0a443a417 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -62,6 +62,7 @@
 
     private volatile long beginTimeInLock = 0;
     private final PutMessageLock putMessageLock;
+    private final GroupCommitCallback groupCommitCallback ;
 
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
         this.mappedFileQueue = new 
MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
@@ -84,6 +85,7 @@ protected MessageExtBatchEncoder initialValue() {
             }
         };
         this.putMessageLock = 
defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() 
? new PutMessageReentrantLock() : new PutMessageSpinLock();
+        this.groupCommitCallback = new GroupCommitCallback() ;
 
     }
 
@@ -518,7 +520,7 @@ public long getBeginTimeInLock() {
         return beginTimeInLock;
     }
 
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+    public void putMessage(final MessageExtBrokerInner msg , final 
CommitLogPutMessageCallback commitLogPutMessageCallback) {
         // Set the storage time
         msg.setStoreTimestamp(System.currentTimeMillis());
         // Set the message body BODY CRC (consider the most appropriate setting
@@ -573,7 +575,8 @@ public PutMessageResult putMessage(final 
MessageExtBrokerInner msg) {
             if (null == mappedFile) {
                 log.error("create mapped file1 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                 beginTimeInLock = 0;
-                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+                commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+                return ;
             }
 
             result = mappedFile.appendMessage(msg, this.appendMessageCallback);
@@ -588,20 +591,24 @@ public PutMessageResult putMessage(final 
MessageExtBrokerInner msg) {
                         // XXX: warn and notify me
                         log.error("create mapped file2 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                         beginTimeInLock = 0;
-                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+                        commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
+                        return ;
                     }
                     result = mappedFile.appendMessage(msg, 
this.appendMessageCallback);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
+                    return ;
                 case UNKNOWN_ERROR:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+                    return ;
                 default:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+                    return ;
             }
 
             eclipseTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
@@ -625,9 +632,9 @@ public PutMessageResult putMessage(final 
MessageExtBrokerInner msg) {
         
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
 
         handleDiskFlush(result, putMessageResult, msg);
-        handleHA(result, putMessageResult, msg);
-
-        return putMessageResult;
+        if (!handleHA(result, putMessageResult, msg, 
commitLogPutMessageCallback)) {
+            commitLogPutMessageCallback.callback(putMessageResult);
+        }
     }
 
     public void handleDiskFlush(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
@@ -657,22 +664,25 @@ public void handleDiskFlush(AppendMessageResult result, 
PutMessageResult putMess
         }
     }
 
-    public void handleHA(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
+    /**
+     *
+     * @param result
+     * @param putMessageResult
+     * @param messageExt
+     * @param commitLogPutMessageCallback
+     * @return whether to wait slave, if false, the request should respond 
synchronously
+     */
+    public boolean handleHA(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt, CommitLogPutMessageCallback 
commitLogPutMessageCallback) {
         if (BrokerRole.SYNC_MASTER == 
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
             HAService service = this.defaultMessageStore.getHaService();
             if (messageExt.isWaitStoreMsgOK()) {
                 // Determine whether to wait
                 if (service.isSlaveOK(result.getWroteOffset() + 
result.getWroteBytes())) {
-                    GroupCommitRequest request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+
+                    GroupCommitRequest request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes() , 
commitLogPutMessageCallback, putMessageResult, messageExt, groupCommitCallback) 
;
                     service.putRequest(request);
                     service.getWaitNotifyObject().wakeupAll();
-                    boolean flushOK =
-                        
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
-                    if (!flushOK) {
-                        log.error("do sync transfer other node, wait return, 
but failed, topic: " + messageExt.getTopic() + " tags: "
-                            + messageExt.getTags() + " client address: " + 
messageExt.getBornHostNameString());
-                        
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
-                    }
+                    return true;
                 }
                 // Slave problem
                 else {
@@ -682,9 +692,10 @@ public void handleHA(AppendMessageResult result, 
PutMessageResult putMessageResu
             }
         }
 
+        return false;
     }
 
-    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) 
{
+    public void putMessages(final MessageExtBatch messageExtBatch , 
CommitLogPutMessageCallback commitLogPutMessageCallback) {
         messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
         AppendMessageResult result;
 
@@ -693,10 +704,12 @@ public PutMessageResult putMessages(final MessageExtBatch 
messageExtBatch) {
         final int tranType = 
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
 
         if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            return ;
         }
         if (messageExtBatch.getDelayTimeLevel() > 0) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            return ;
         }
 
         long eclipseTimeInLock = 0;
@@ -723,7 +736,8 @@ public PutMessageResult putMessages(final MessageExtBatch 
messageExtBatch) {
             if (null == mappedFile) {
                 log.error("Create mapped file1 error, topic: {} clientAddr: 
{}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                 beginTimeInLock = 0;
-                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+                commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+                return ;
             }
 
             result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
@@ -738,20 +752,24 @@ public PutMessageResult putMessages(final MessageExtBatch 
messageExtBatch) {
                         // XXX: warn and notify me
                         log.error("Create mapped file2 error, topic: {} 
clientAddr: {}", messageExtBatch.getTopic(), 
messageExtBatch.getBornHostString());
                         beginTimeInLock = 0;
-                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+                        commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
+                        return ;
                     }
                     result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
+                    return ;
                 case UNKNOWN_ERROR:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+                    return ;
                 default:
                     beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                    commitLogPutMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+                    return ;
             }
 
             eclipseTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
@@ -776,9 +794,9 @@ public PutMessageResult putMessages(final MessageExtBatch 
messageExtBatch) {
 
         handleDiskFlush(result, putMessageResult, messageExtBatch);
 
-        handleHA(result, putMessageResult, messageExtBatch);
-
-        return putMessageResult;
+        if (!handleHA(result, putMessageResult, messageExtBatch, 
commitLogPutMessageCallback)) {
+            commitLogPutMessageCallback.callback(putMessageResult);
+        }
     }
 
     /**
@@ -1027,10 +1045,21 @@ public long getJointime() {
         private final long nextOffset;
         private final CountDownLatch countDownLatch = new CountDownLatch(1);
         private volatile boolean flushOK = false;
+        private CommitLogPutMessageCallback commitLogPutMessageCallback ;
+        private PutMessageResult putMessageResult ;
+        private MessageExt messageExt ;
+        private GroupCommitCallback groupCommitCallback ;
 
         public GroupCommitRequest(long nextOffset) {
             this.nextOffset = nextOffset;
         }
+        public GroupCommitRequest(long nextOffset, CommitLogPutMessageCallback 
commitLogPutMessageCallback, PutMessageResult putMessageResult, MessageExt 
messageExt, GroupCommitCallback groupCommitCallback) {
+            this.nextOffset = nextOffset;
+            this.commitLogPutMessageCallback = commitLogPutMessageCallback;
+            this.putMessageResult = putMessageResult;
+            this.messageExt = messageExt;
+            this.groupCommitCallback = groupCommitCallback ;
+        }
 
         public long getNextOffset() {
             return nextOffset;
@@ -1050,6 +1079,42 @@ public boolean waitForFlush(long timeout) {
                 return false;
             }
         }
+
+        public void setFlushOK(boolean flushOK) {
+            this.flushOK = flushOK;
+        }
+
+        public boolean isFlushOK() {
+            return flushOK;
+        }
+
+        public CommitLogPutMessageCallback getCommitLogPutMessageCallback() {
+            return commitLogPutMessageCallback;
+        }
+
+        public PutMessageResult getPutMessageResult() {
+            return putMessageResult;
+        }
+
+        public MessageExt getMessageExt() {
+            return messageExt;
+        }
+
+        public GroupCommitCallback getGroupCommitCallback() {
+            return groupCommitCallback;
+        }
+    }
+
+    public static class GroupCommitCallback {
+        public void doSlaveAction(GroupCommitRequest request) {
+            boolean flushOK = request.isFlushOK() ;
+            if (!flushOK) {
+                log.error("do sync transfer other node, wait return, but 
failed, topic: " + request.getMessageExt().getTopic() + " tags: "
+                        + request.getMessageExt().getTags() + " client 
address: " + request.getMessageExt().getBornHostNameString());
+                
request.getPutMessageResult().setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+            }
+            
request.getCommitLogPutMessageCallback().callback(request.getPutMessageResult());
+        }
     }
 
     /**
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
 
b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
new file mode 100644
index 000000000..f2cffcd5d
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * CommitLogPutMessageCallback is a asynchronous callback for messageStore
+ */
+public interface CommitLogPutMessageCallback {
+    void callback(PutMessageResult result) ;
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7a5647c3e..2c0854fef 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -302,10 +302,11 @@ public void destroyLogics() {
         }
     }
 
-    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+    public void putMessage(final MessageExtBrokerInner msg , final 
PutMessageCallback putMessageCallback) {
         if (this.shutdown) {
             log.warn("message store has shutdown, so putMessage is forbidden");
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         }
 
         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -314,7 +315,8 @@ public PutMessageResult putMessage(MessageExtBrokerInner 
msg) {
                 log.warn("message store is slave mode, so putMessage is 
forbidden ");
             }
 
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         }
 
         if (!this.runningFlags.isWriteable()) {
@@ -323,45 +325,54 @@ public PutMessageResult putMessage(MessageExtBrokerInner 
msg) {
                 log.warn("message store is not writeable, so putMessage is 
forbidden " + this.runningFlags.getFlagBits());
             }
 
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         } else {
             this.printTimes.set(0);
         }
 
         if (msg.getTopic().length() > Byte.MAX_VALUE) {
             log.warn("putMessage message topic length too long " + 
msg.getTopic().length());
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            return ;
         }
 
         if (msg.getPropertiesString() != null && 
msg.getPropertiesString().length() > Short.MAX_VALUE) {
             log.warn("putMessage message properties length too long " + 
msg.getPropertiesString().length());
-            return new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null));
+            return ;
         }
 
         if (this.isOSPageCacheBusy()) {
-            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, 
null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+            return ;
         }
 
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessage(msg);
-
-        long eclipseTime = this.getSystemClock().now() - beginTime;
-        if (eclipseTime > 500) {
-            log.warn("putMessage not in lock eclipse time(ms)={}, 
bodyLength={}", eclipseTime, msg.getBody().length);
-        }
-        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+        final long beginTime = this.getSystemClock().now();
+        CommitLogPutMessageCallback commitLogPutMessageCallback = new 
CommitLogPutMessageCallback() {
+            @Override
+            public void callback(PutMessageResult result) {
+                long eclipseTime = getSystemClock().now() - beginTime;
+                if (eclipseTime > 500) {
+                    log.warn("putMessage not in lock eclipse time(ms)={}, 
bodyLength={}", eclipseTime, msg.getBody().length);
+                }
+                storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
 
-        if (null == result || !result.isOk()) {
-            
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
-        }
+                if (null == result || !result.isOk()) {
+                    
storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+                }
 
-        return result;
+                putMessageCallback.callback(result);
+            }
+        };
+        this.commitLog.putMessage(msg , commitLogPutMessageCallback);
     }
 
-    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+    public void putMessages(final MessageExtBatch messageExtBatch , final 
PutMessageCallback putMessageCallback) {
         if (this.shutdown) {
             log.warn("DefaultMessageStore has shutdown, so putMessages is 
forbidden");
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         }
 
         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -370,7 +381,8 @@ public PutMessageResult putMessages(MessageExtBatch 
messageExtBatch) {
                 log.warn("DefaultMessageStore is in slave mode, so putMessages 
is forbidden ");
             }
 
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         }
 
         if (!this.runningFlags.isWriteable()) {
@@ -379,39 +391,47 @@ public PutMessageResult putMessages(MessageExtBatch 
messageExtBatch) {
                 log.warn("DefaultMessageStore is not writable, so putMessages 
is forbidden " + this.runningFlags.getFlagBits());
             }
 
-            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+            return ;
         } else {
             this.printTimes.set(0);
         }
 
         if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
             log.warn("PutMessages topic length too long " + 
messageExtBatch.getTopic().length());
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            return ;
         }
 
         if (messageExtBatch.getBody().length > 
messageStoreConfig.getMaxMessageSize()) {
             log.warn("PutMessages body length too long " + 
messageExtBatch.getBody().length);
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            return ;
         }
 
         if (this.isOSPageCacheBusy()) {
-            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, 
null);
+            putMessageCallback.callback(new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+            return ;
         }
 
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
-
-        long eclipseTime = this.getSystemClock().now() - beginTime;
-        if (eclipseTime > 500) {
-            log.warn("not in lock eclipse time(ms)={}, bodyLength={}", 
eclipseTime, messageExtBatch.getBody().length);
-        }
-        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+        final long beginTime = this.getSystemClock().now();
+        CommitLogPutMessageCallback commitLogPutMessageCallback = new 
CommitLogPutMessageCallback() {
+            @Override
+            public void callback(PutMessageResult result) {
+                long eclipseTime = getSystemClock().now() - beginTime;
+                if (eclipseTime > 500) {
+                    log.warn("not in lock eclipse time(ms)={}, bodyLength={}", 
eclipseTime, messageExtBatch.getBody().length);
+                }
+                storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
 
-        if (null == result || !result.isOk()) {
-            
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
-        }
+                if (null == result || !result.isOk()) {
+                    
storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+                }
 
-        return result;
+                putMessageCallback.callback(result);
+            }
+        } ;
+        this.commitLog.putMessages(messageExtBatch , 
commitLogPutMessageCallback);
     }
 
     @Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 907dfe209..4d49bf8f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -51,21 +51,34 @@
      */
     void destroy();
 
-    /**
+/*
+    */
+/**
      * Store a message into store.
      *
      * @param msg Message instance to store
      * @return result of store operation.
-     */
+     *//*
+
     PutMessageResult putMessage(final MessageExtBrokerInner msg);
+*/
+
+    /**
+     * Store a message into store.
+     *
+     * @param msg Message instance to store
+     * @param putMessageCallback callback of put message
+     *
+     */
+    void putMessage(final MessageExtBrokerInner msg , PutMessageCallback 
putMessageCallback);
 
     /**
      * Store a batch of messages.
      *
      * @param messageExtBatch Message batch.
-     * @return result of storing batch messages.
+     * @param putMessageCallback callback of put message
      */
-    PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
+    void putMessages(final MessageExtBatch messageExtBatch , 
PutMessageCallback putMessageCallback);
 
     /**
      * Query at most <code>maxMsgNums</code> messages belonging to 
<code>topic</code> at <code>queueId</code> starting
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java 
b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java
new file mode 100644
index 000000000..d83f3b48b
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * PutMessageCallback is a asynchronous callback for processor process the put 
message result
+ */
+public class PutMessageCallback {
+    private final Object waitObject = new Object();
+    private volatile boolean completed = false;
+    private PutMessageResult putMessageResult;
+
+    protected void doAction(PutMessageResult putMessageResult) {
+        //default empty
+    }
+
+    public void callback(PutMessageResult putMessageResult) {
+        doAction(putMessageResult);
+        this.putMessageResult = putMessageResult;
+        completed = true;
+        synchronized (waitObject) {
+            waitObject.notifyAll();
+        }
+    }
+
+    public void waitComplete() throws InterruptedException {
+        waitComplete(-1);
+    }
+
+    public void waitComplete(long timeout) throws InterruptedException {
+        synchronized (waitObject) {
+            if (timeout < 0) {
+                waitObject.wait();
+            }
+            else {
+                waitObject.wait(timeout);
+            }
+        }
+    }
+
+    public boolean isCompleted() {
+        return completed;
+    }
+
+    public PutMessageResult getPutMessageResult() {
+        return putMessageResult;
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 51a8a2703..cec967fec 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -25,10 +25,11 @@
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -253,15 +254,15 @@ public String getServiceName() {
     class GroupTransferService extends ServiceThread {
 
         private final WaitNotifyObject notifyTransferObject = new 
WaitNotifyObject();
-        private volatile List<CommitLog.GroupCommitRequest> requestsWrite = 
new ArrayList<>();
-        private volatile List<CommitLog.GroupCommitRequest> requestsRead = new 
ArrayList<>();
+        private ConcurrentSkipListMap<Long , CommitLog.GroupCommitRequest> 
groupCommitRequestConcurrentSkipListMap = new ConcurrentSkipListMap<>() ;
 
-        public synchronized void putRequest(final CommitLog.GroupCommitRequest 
request) {
-            synchronized (this.requestsWrite) {
-                this.requestsWrite.add(request);
+        public void putRequest(final CommitLog.GroupCommitRequest request) {
+            if (request.getNextOffset() >= 
HAService.this.push2SlaveMaxOffset.get()) {
+                request.setFlushOK(true);
+                request.getGroupCommitCallback().doSlaveAction(request) ;
             }
-            if (hasNotified.compareAndSet(false, true)) {
-                waitPoint.countDown(); // notify
+            else {
+                
groupCommitRequestConcurrentSkipListMap.put(request.getNextOffset() , request) ;
             }
         }
 
@@ -269,32 +270,33 @@ public void notifyTransferSome() {
             this.notifyTransferObject.wakeup();
         }
 
-        private void swapRequests() {
-            List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
-            this.requestsWrite = this.requestsRead;
-            this.requestsRead = tmp;
-        }
-
+        /**
+         * wait slave fetch message or fetch timeout,then response the produce 
request
+         */
         private void doWaitTransfer() {
-            synchronized (this.requestsRead) {
-                if (!this.requestsRead.isEmpty()) {
-                    for (CommitLog.GroupCommitRequest req : this.requestsRead) 
{
-                        boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        for (int i = 0; !transferOK && i < 5; i++) {
-                            this.notifyTransferObject.waitForRunning(1000);
-                            transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        }
-
-                        if (!transferOK) {
-                            log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
-                        }
-
-                        req.wakeupCustomer(transferOK);
+            long waitStart = System.currentTimeMillis();
+            boolean waitTimeout = false ;
+            while (!groupCommitRequestConcurrentSkipListMap.isEmpty() &&
+                    
(groupCommitRequestConcurrentSkipListMap.firstEntry().getKey() <= 
HAService.this.push2SlaveMaxOffset.get()
+                            || (waitTimeout = System.currentTimeMillis() - 
waitStart > 5000))) {
+                if (waitTimeout) {
+                    Long offset = 
groupCommitRequestConcurrentSkipListMap.firstEntry().getKey();
+                    CommitLog.GroupCommitRequest request = 
groupCommitRequestConcurrentSkipListMap.remove(offset) ;
+                    request.setFlushOK(false);
+                    request.getGroupCommitCallback().doSlaveAction(request);
+                }
+                else {
+                    ConcurrentNavigableMap<Long, CommitLog.GroupCommitRequest> 
subMap = 
groupCommitRequestConcurrentSkipListMap.headMap(HAService.this.push2SlaveMaxOffset.get());
+                    for (Long offset : subMap.keySet()) {
+                        CommitLog.GroupCommitRequest request = 
subMap.remove(offset);
+                        request.setFlushOK(true);
+                        
request.getGroupCommitCallback().doSlaveAction(request);
                     }
-
-                    this.requestsRead.clear();
                 }
+                waitTimeout = false ;
+                waitStart = System.currentTimeMillis();
             }
+            this.notifyTransferObject.waitForRunning(1000);
         }
 
         public void run() {
@@ -302,7 +304,6 @@ public void run() {
 
             while (!this.isStopped()) {
                 try {
-                    this.waitForRunning(10);
                     this.doWaitTransfer();
                 } catch (Exception e) {
                     log.warn(this.getServiceName() + " service has exception. 
", e);
@@ -312,11 +313,6 @@ public void run() {
             log.info(this.getServiceName() + " service end");
         }
 
-        @Override
-        protected void onWaitEnd() {
-            this.swapRequests();
-        }
-
         @Override
         public String getServiceName() {
             return GroupTransferService.class.getSimpleName();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 35b8e8565..2b6315bbe 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -36,6 +36,7 @@
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageCallback;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -44,9 +45,8 @@
 import org.slf4j.LoggerFactory;
 
 public class ScheduleMessageService extends ConfigManager {
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
     public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static final long FIRST_DELAY_TIME = 1000L;
     private static final long DELAY_FOR_A_WHILE = 100L;
     private static final long DELAY_FOR_A_PERIOD = 10000L;
@@ -283,11 +283,13 @@ public void executeOnTimeup() {
 
                                 if (msgExt != null) {
                                     try {
+                                        PutMessageCallback putMessageCallback 
= new PutMessageCallback();
                                         MessageExtBrokerInner msgInner = 
this.messageTimeup(msgExt);
-                                        PutMessageResult putMessageResult =
-                                            
ScheduleMessageService.this.defaultMessageStore
-                                                .putMessage(msgInner);
+                                        
ScheduleMessageService.this.defaultMessageStore
+                                            .putMessage(msgInner, 
putMessageCallback);
 
+                                        putMessageCallback.waitComplete();
+                                        PutMessageResult putMessageResult = 
putMessageCallback.getPutMessageResult();
                                         if (putMessageResult != null
                                             && 
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                             continue;
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b7d38f8c7..94e78704e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -126,7 +126,9 @@ protected void putMsg(DefaultMessageStore master) throws 
Exception {
         long totalMsgs = 200;
 
         for (long i = 0; i < totalMsgs; i++) {
-            master.putMessage(buildMessage());
+            PutMessageCallback putMessageCallback = new PutMessageCallback() ;
+            master.putMessage(buildMessage() , putMessageCallback);
+            putMessageCallback.waitComplete();
         }
     }
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 9269cdfa7..e31eee854 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -24,13 +24,12 @@
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -108,7 +107,7 @@ public void testWriteAndRead() throws Exception {
         QUEUE_TOTAL = 1;
         MessageBody = StoreMessage.getBytes();
         for (long i = 0; i < totalMsgs; i++) {
-            messageStore.putMessage(buildMessage());
+            messageStore.putMessage(buildMessage() , new PutMessageCallback());
         }
 
         for (long i = 0; i < totalMsgs; i++) {
@@ -140,7 +139,7 @@ public void testGroupCommit() throws Exception {
         QUEUE_TOTAL = 1;
         MessageBody = StoreMessage.getBytes();
         for (long i = 0; i < totalMsgs; i++) {
-            messageStore.putMessage(buildMessage());
+            messageStore.putMessage(buildMessage() , new PutMessageCallback());
         }
 
         for (long i = 0; i < totalMsgs; i++) {
@@ -153,7 +152,7 @@ public void testGroupCommit() throws Exception {
 
     private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore 
master) {
         for (long i = 0; i < totalMsgs; i++) {
-            master.putMessage(buildMessage());
+            master.putMessage(buildMessage() , new PutMessageCallback());
         }
 
         for (long i = 0; i < totalMsgs; i++) {
@@ -172,7 +171,7 @@ public void testPullSize() throws Exception {
             MessageExtBrokerInner messageExtBrokerInner = buildMessage();
             messageExtBrokerInner.setTopic(topic);
             messageExtBrokerInner.setQueueId(0);
-            messageStore.putMessage(messageExtBrokerInner);
+            messageStore.putMessage(messageExtBrokerInner , new 
PutMessageCallback());
         }
         //wait for consume queue build
         Thread.sleep(10);
@@ -190,7 +189,7 @@ public void testPullSize() throws Exception {
     private class MyMessageArrivingListener implements MessageArrivingListener 
{
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode, long msgStoreTime,
-                             byte[] filterBitMap, Map<String, String> 
properties) {
+            byte[] filterBitMap, Map<String, String> properties) {
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to