This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0325772  [ISSUE #2883] [Part E] Improve produce performance in M/S 
mode.  (#2889)
0325772 is described below

commit 032577255ecd543908446aa272668c9f882bcce5
Author: huangli <[email protected]>
AuthorDate: Wed Jul 28 14:49:45 2021 +0800

    [ISSUE #2883] [Part E] Improve produce performance in M/S mode.  (#2889)
    
    * Remove putMessage/putMessages method in CommitLog which has too many 
duplicated code.
    
    * Optimise performance of asyncPutMessage (extract some code out of 
putMessage lock)
    
    * extract generation of msgId out of lock in CommitLog (now only for single 
message processor)
    
    * extract generation of topicQueueTable key out of sync code
    
    * extract generation of msgId out of lock in CommitLog (for batch)
    
    * fix ipv6 problem introduced in commit "Optimise performance of 
asyncPutMessage (extract some code out of putMessage lock)"
---
 .../rocketmq/store/AppendMessageCallback.java      |   5 +-
 .../apache/rocketmq/store/AppendMessageResult.java |  17 +
 .../java/org/apache/rocketmq/store/CommitLog.java  | 760 ++++++++-------------
 .../apache/rocketmq/store/DefaultMessageStore.java |  55 +-
 .../java/org/apache/rocketmq/store/MappedFile.java |  20 +-
 .../rocketmq/store/MessageExtBrokerInner.java      |  12 +
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 232 -------
 .../apache/rocketmq/store/AppendCallbackTest.java  |  28 +-
 8 files changed, 361 insertions(+), 768 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java 
b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index d337638..5499c90 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
 
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 
 /**
  * Write messages callback interface
@@ -30,7 +31,7 @@ public interface AppendMessageCallback {
      * @return How many bytes to write
      */
     AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer 
byteBuffer,
-        final int maxBlank, final MessageExtBrokerInner msg);
+        final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext 
putMessageContext);
 
     /**
      * After batched message serialization, write MapedByteBuffer
@@ -39,5 +40,5 @@ public interface AppendMessageCallback {
      * @return How many bytes to write
      */
     AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer 
byteBuffer,
-        final int maxBlank, final MessageExtBatch messageExtBatch);
+        final int maxBlank, final MessageExtBatch messageExtBatch, 
PutMessageContext putMessageContext);
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java 
b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index d6d1aa6..de3c03b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.util.function.Supplier;
+
 /**
  * When write a message to the commit log, returns results
  */
@@ -28,6 +30,7 @@ public class AppendMessageResult {
     private int wroteBytes;
     // Message ID
     private String msgId;
+    private Supplier<String> msgIdSupplier;
     // Message storage timestamp
     private long storeTimestamp;
     // Consume queue's offset(step by one)
@@ -51,6 +54,17 @@ public class AppendMessageResult {
         this.pagecacheRT = pagecacheRT;
     }
 
+    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, 
int wroteBytes, Supplier<String> msgIdSupplier,
+            long storeTimestamp, long logicsOffset, long pagecacheRT) {
+        this.status = status;
+        this.wroteOffset = wroteOffset;
+        this.wroteBytes = wroteBytes;
+        this.msgIdSupplier = msgIdSupplier;
+        this.storeTimestamp = storeTimestamp;
+        this.logicsOffset = logicsOffset;
+        this.pagecacheRT = pagecacheRT;
+    }
+
     public long getPagecacheRT() {
         return pagecacheRT;
     }
@@ -88,6 +102,9 @@ public class AppendMessageResult {
     }
 
     public String getMsgId() {
+        if (msgId == null && msgIdSupplier != null) {
+            msgId = msgIdSupplier.get();
+        }
         return msgId;
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 57fa363..5e92654 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -16,17 +16,18 @@
  */
 package org.apache.rocketmq.store;
 
+import java.net.Inet4Address;
 import java.net.Inet6Address;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
@@ -62,7 +63,7 @@ public class CommitLog {
     private final FlushCommitLogService commitLogService;
 
     private final AppendMessageCallback appendMessageCallback;
-    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
+    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
     protected HashMap<String/* topic-queueid */, Long/* offset */> 
topicQueueTable = new HashMap<String, Long>(1024);
     protected volatile long confirmOffset = -1L;
 
@@ -84,10 +85,10 @@ public class CommitLog {
         this.commitLogService = new CommitRealTimeService();
 
         this.appendMessageCallback = new 
DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
-        batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
+        putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
             @Override
-            protected MessageExtBatchEncoder initialValue() {
-                return new 
MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+            protected PutMessageThreadLocal initialValue() {
+                return new 
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
         this.putMessageLock = 
defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() 
? new PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -555,6 +556,14 @@ public class CommitLog {
         return beginTimeInLock;
     }
 
+    private String generateKey(StringBuilder keyBuilder, MessageExt 
messageExt) {
+        keyBuilder.setLength(0);
+        keyBuilder.append(messageExt.getTopic());
+        keyBuilder.append('-');
+        keyBuilder.append(messageExt.getQueueId());
+        return keyBuilder.toString();
+    }
+
     public CompletableFuture<PutMessageResult> asyncPutMessage(final 
MessageExtBrokerInner msg) {
         // Set the storage time
         msg.setStoreTimestamp(System.currentTimeMillis());
@@ -591,12 +600,30 @@ public class CommitLog {
             }
         }
 
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) 
msg.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) 
msg.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setStoreHostAddressV6Flag();
+        }
+
+        PutMessageThreadLocal putMessageThreadLocal = 
this.putMessageThreadLocal.get();
+        PutMessageResult encodeResult = 
putMessageThreadLocal.getEncoder().encode(msg);
+        if (encodeResult != null) {
+            return CompletableFuture.completedFuture(encodeResult);
+        }
+        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
+        PutMessageContext putMessageContext = new 
PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
+
         long elapsedTimeInLock = 0;
         MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
         putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
         try {
+            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
             long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
 
@@ -613,7 +640,7 @@ public class CommitLog {
                 return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
             }
 
-            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+            result = mappedFile.appendMessage(msg, this.appendMessageCallback, 
putMessageContext);
             switch (result.getStatus()) {
                 case PUT_OK:
                     break;
@@ -627,7 +654,7 @@ public class CommitLog {
                         beginTimeInLock = 0;
                         return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                     }
-                    result = mappedFile.appendMessage(msg, 
this.appendMessageCallback);
+                    result = mappedFile.appendMessage(msg, 
this.appendMessageCallback, putMessageContext);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
@@ -693,14 +720,26 @@ public class CommitLog {
             return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
         }
 
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) 
messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) 
messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
         long elapsedTimeInLock = 0;
         MappedFile unlockMappedFile = null;
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
         //fine-grained lock instead of the coarse-grained
-        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
+        PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+        MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new 
PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, 
putMessageContext));
 
         putMessageLock.lock();
         try {
@@ -720,7 +759,7 @@ public class CommitLog {
                 return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
             }
 
-            result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
+            result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback, putMessageContext);
             switch (result.getStatus()) {
                 case PUT_OK:
                     break;
@@ -734,7 +773,7 @@ public class CommitLog {
                         beginTimeInLock = 0;
                         return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                     }
-                    result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
+                    result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback, putMessageContext);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
@@ -784,129 +823,6 @@ public class CommitLog {
 
     }
 
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-        // Set the storage time
-        msg.setStoreTimestamp(System.currentTimeMillis());
-        // Set the message body BODY CRC (consider the most appropriate setting
-        // on the client)
-        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
-        // Back to Results
-        AppendMessageResult result = null;
-
-        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
-
-        String topic = msg.getTopic();
-        int queueId = msg.getQueueId();
-
-        final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
-        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
-            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            // Delay Delivery
-            if (msg.getDelayTimeLevel() > 0) {
-                if (msg.getDelayTimeLevel() > 
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
-                    
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
-                }
-
-                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
-                queueId = 
ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
-                // Backup real topic, queueId
-                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
-                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
-                
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-
-                msg.setTopic(topic);
-                msg.setQueueId(queueId);
-            }
-        }
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) 
msg.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            msg.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) 
msg.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            msg.setStoreHostAddressV6Flag();
-        }
-
-        long elapsedTimeInLock = 0;
-
-        MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
-        try {
-            long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
-            this.beginTimeInLock = beginLockTimestamp;
-
-            // Here settings are stored timestamp, in order to ensure an 
orderly
-            // global
-            msg.setStoreTimestamp(beginLockTimestamp);
-
-            if (null == mappedFile || mappedFile.isFull()) {
-                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
-            }
-            if (null == mappedFile) {
-                log.error("create mapped file1 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
-                beginTimeInLock = 0;
-                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
-            }
-
-            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
-            switch (result.getStatus()) {
-                case PUT_OK:
-                    break;
-                case END_OF_FILE:
-                    unlockMappedFile = mappedFile;
-                    // Create a new file, re-write the message
-                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
-                    if (null == mappedFile) {
-                        // XXX: warn and notify me
-                        log.error("create mapped file2 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
-                        beginTimeInLock = 0;
-                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
-                    }
-                    result = mappedFile.appendMessage(msg, 
this.appendMessageCallback);
-                    break;
-                case MESSAGE_SIZE_EXCEEDED:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
-                case UNKNOWN_ERROR:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-                default:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-            }
-
-            elapsedTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
-            beginTimeInLock = 0;
-        } finally {
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, 
result);
-        }
-
-        if (null != unlockMappedFile && 
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
-            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
-        }
-
-        PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
-
-        // Statistics
-        
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
-        
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
-
-        handleDiskFlush(result, putMessageResult, msg);
-        handleHA(result, putMessageResult, msg);
-
-        return putMessageResult;
-    }
-
     public CompletableFuture<PutMessageStatus> 
submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
         // Synchronization flush
         if (FlushDiskType.SYNC_FLUSH == 
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -951,179 +867,6 @@ public class CommitLog {
         return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
     }
 
-
-    public void handleDiskFlush(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
-        // Synchronization flush
-        if (FlushDiskType.SYNC_FLUSH == 
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
-            final GroupCommitService service = (GroupCommitService) 
this.flushCommitLogService;
-            if (messageExt.isWaitStoreMsgOK()) {
-                GroupCommitRequest request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
-                service.putRequest(request);
-                CompletableFuture<PutMessageStatus> flushOkFuture = 
request.future();
-                PutMessageStatus flushStatus = null;
-                try {
-                    flushStatus = 
flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                            TimeUnit.MILLISECONDS);
-                } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                    //flushOK=false;
-                }
-                if (flushStatus != PutMessageStatus.PUT_OK) {
-                    log.error("do groupcommit, wait for flush failed, topic: " 
+ messageExt.getTopic() + " tags: " + messageExt.getTags()
-                        + " client address: " + 
messageExt.getBornHostString());
-                    
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
-                }
-            } else {
-                service.wakeup();
-            }
-        }
-        // Asynchronous flush
-        else {
-            if 
(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())
 {
-                flushCommitLogService.wakeup();
-            } else {
-                commitLogService.wakeup();
-            }
-        }
-    }
-
-    public void handleHA(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
-        if (BrokerRole.SYNC_MASTER == 
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
-            HAService service = this.defaultMessageStore.getHaService();
-            if (messageExt.isWaitStoreMsgOK()) {
-                // Determine whether to wait
-                if (service.isSlaveOK(result.getWroteOffset() + 
result.getWroteBytes())) {
-                    GroupCommitRequest request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
-                    service.putRequest(request);
-                    service.getWaitNotifyObject().wakeupAll();
-                    PutMessageStatus replicaStatus = null;
-                    try {
-                        replicaStatus = 
request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                                TimeUnit.MILLISECONDS);
-                    } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                    }
-                    if (replicaStatus != PutMessageStatus.PUT_OK) {
-                        log.error("do sync transfer other node, wait return, 
but failed, topic: " + messageExt.getTopic() + " tags: "
-                            + messageExt.getTags() + " client address: " + 
messageExt.getBornHostNameString());
-                        
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
-                    }
-                }
-                // Slave problem
-                else {
-                    // Tell the producer, slave not available
-                    
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
-                }
-            }
-        }
-
-    }
-
-    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) 
{
-        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
-        AppendMessageResult result;
-
-        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
-
-        final int tranType = 
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
-        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
-        }
-        if (messageExtBatch.getDelayTimeLevel() > 0) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
-        }
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) 
messageExtBatch.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) 
messageExtBatch.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setStoreHostAddressV6Flag();
-        }
-
-        long elapsedTimeInLock = 0;
-        MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
-        //fine-grained lock instead of the coarse-grained
-        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
-
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
-
-        putMessageLock.lock();
-        try {
-            long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
-            this.beginTimeInLock = beginLockTimestamp;
-
-            // Here settings are stored timestamp, in order to ensure an 
orderly
-            // global
-            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
-
-            if (null == mappedFile || mappedFile.isFull()) {
-                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
-            }
-            if (null == mappedFile) {
-                log.error("Create mapped file1 error, topic: {} clientAddr: 
{}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
-                beginTimeInLock = 0;
-                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
-            }
-
-            result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
-            switch (result.getStatus()) {
-                case PUT_OK:
-                    break;
-                case END_OF_FILE:
-                    unlockMappedFile = mappedFile;
-                    // Create a new file, re-write the message
-                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
-                    if (null == mappedFile) {
-                        // XXX: warn and notify me
-                        log.error("Create mapped file2 error, topic: {} 
clientAddr: {}", messageExtBatch.getTopic(), 
messageExtBatch.getBornHostString());
-                        beginTimeInLock = 0;
-                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
-                    }
-                    result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
-                    break;
-                case MESSAGE_SIZE_EXCEEDED:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
-                case UNKNOWN_ERROR:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-                default:
-                    beginTimeInLock = 0;
-                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-            }
-
-            elapsedTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
-            beginTimeInLock = 0;
-        } finally {
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, 
messageExtBatch.getBody().length, result);
-        }
-
-        if (null != unlockMappedFile && 
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
-            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
-        }
-
-        PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
-
-        // Statistics
-        
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
-        
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
-
-        handleDiskFlush(result, putMessageResult, messageExtBatch);
-
-        handleHA(result, putMessageResult, messageExtBatch);
-
-        return putMessageResult;
-    }
-
     /**
      * According to receive certain message or offset storage time if an error 
occurs, it returns -1
      */
@@ -1509,50 +1252,33 @@ public class CommitLog {
         private final ByteBuffer msgStoreItemMemory;
         // The maximum length of the message
         private final int maxMessageSize;
-        // Build Message Key
-        private final StringBuilder keyBuilder = new StringBuilder();
-
-        private final StringBuilder msgIdBuilder = new StringBuilder();
 
         DefaultAppendMessageCallback(final int size) {
             this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
             this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
-            this.msgStoreItemMemory = ByteBuffer.allocate(size + 
END_FILE_MIN_BLANK_LENGTH);
+            this.msgStoreItemMemory = 
ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
             this.maxMessageSize = size;
         }
 
-        public ByteBuffer getMsgStoreItemMemory() {
-            return msgStoreItemMemory;
-        }
-
         public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank,
-            final MessageExtBrokerInner msgInner) {
+            final MessageExtBrokerInner msgInner, PutMessageContext 
putMessageContext) {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
             // PHY OFFSET
             long wroteOffset = fileFromOffset + byteBuffer.position();
 
-            int sysflag = msgInner.getSysFlag();
-
-            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) 
== 0 ? 4 + 4 : 16 + 4;
-            int storeHostLength = (sysflag & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
-            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
-            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
-
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            String msgId;
-            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
-                msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
-            } else {
-                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, 
msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
-            }
+            Supplier<String> msgIdSupplier = () -> {
+                int sysflag = msgInner.getSysFlag();
+                int msgIdLen = (sysflag & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), 
msgIdBuffer);
+                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip 
the buffer
+                msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
+                return UtilAll.bytes2string(msgIdBuffer.array());
+            };
 
             // Record ConsumeQueue information
-            keyBuilder.setLength(0);
-            keyBuilder.append(msgInner.getTopic());
-            keyBuilder.append('-');
-            keyBuilder.append(msgInner.getQueueId());
-            String key = keyBuilder.toString();
+            String key = putMessageContext.getTopicQueueTableKey();
             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
             if (null == queueOffset) {
                 queueOffset = 0L;
@@ -1574,36 +1300,12 @@ public class CommitLog {
                     break;
             }
 
-            /**
-             * Serialize message
-             */
-            final byte[] propertiesData =
-                msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
-
-            final int propertiesLength = propertiesData == null ? 0 : 
propertiesData.length;
-
-            if (propertiesLength > Short.MAX_VALUE) {
-                log.warn("putMessage message properties length too long. 
length={}", propertiesData.length);
-                return new 
AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
-            }
-
-            final byte[] topicData = 
msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
-            final int topicLength = topicData.length;
-
-            final int bodyLength = msgInner.getBody() == null ? 0 : 
msgInner.getBody().length;
-
-            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, 
topicLength, propertiesLength);
-
-            // Exceeds the maximum message
-            if (msgLen > this.maxMessageSize) {
-                CommitLog.log.warn("message size exceeded, msg total size: " + 
msgLen + ", msg body size: " + bodyLength
-                    + ", maxMessageSize: " + this.maxMessageSize);
-                return new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
-            }
+            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+            final int msgLen = preEncodeBuffer.getInt(0);
 
             // Determines whether there is sufficient free space
             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
-                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+                this.msgStoreItemMemory.clear();
                 // 1 TOTALSIZE
                 this.msgStoreItemMemory.putInt(maxBlank);
                 // 2 MAGICCODE
@@ -1611,60 +1313,31 @@ public class CommitLog {
                 // 3 The remaining space may be any value
                 // Here the length of the specially set maxBlank
                 final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
-                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
-                return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, 
msgId, msgInner.getStoreTimestamp(),
-                    queueOffset, CommitLog.this.defaultMessageStore.now() - 
beginTimeMills);
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+                return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
+                        maxBlank, /* only wrote 8 bytes, but declare wrote 
maxBlank for compute write position */
+                        msgIdSupplier, msgInner.getStoreTimestamp(),
+                        queueOffset, CommitLog.this.defaultMessageStore.now() 
- beginTimeMills);
             }
 
-            // Initialization of storage space
-            this.resetByteBuffer(msgStoreItemMemory, msgLen);
-            // 1 TOTALSIZE
-            this.msgStoreItemMemory.putInt(msgLen);
-            // 2 MAGICCODE
-            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
-            // 3 BODYCRC
-            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
-            // 4 QUEUEID
-            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
-            // 5 FLAG
-            this.msgStoreItemMemory.putInt(msgInner.getFlag());
+            int pos = 4 + 4 + 4 + 4 + 4;
             // 6 QUEUEOFFSET
-            this.msgStoreItemMemory.putLong(queueOffset);
+            preEncodeBuffer.putLong(pos, queueOffset);
+            pos += 8;
             // 7 PHYSICALOFFSET
-            this.msgStoreItemMemory.putLong(fileFromOffset + 
byteBuffer.position());
-            // 8 SYSFLAG
-            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
-            // 9 BORNTIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
-            // 10 BORNHOST
-            this.resetByteBuffer(bornHostHolder, bornHostLength);
-            
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
-            // 11 STORETIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
-            // 12 STOREHOSTADDRESS
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
-            // 13 RECONSUMETIMES
-            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
-            // 14 Prepared Transaction Offset
-            
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
-            // 15 BODY
-            this.msgStoreItemMemory.putInt(bodyLength);
-            if (bodyLength > 0)
-                this.msgStoreItemMemory.put(msgInner.getBody());
-            // 16 TOPIC
-            this.msgStoreItemMemory.put((byte) topicLength);
-            this.msgStoreItemMemory.put(topicData);
-            // 17 PROPERTIES
-            this.msgStoreItemMemory.putShort((short) propertiesLength);
-            if (propertiesLength > 0)
-                this.msgStoreItemMemory.put(propertiesData);
+            preEncodeBuffer.putLong(pos, fileFromOffset + 
byteBuffer.position());
+            int ipLen = (msgInner.getSysFlag() & 
MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+            pos += 8 + 4 + 8 + ipLen;
+            // refresh store time stamp in lock
+            preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+
 
             final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
             // Write messages to the queue buffer
-            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
-
-            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+            byteBuffer.put(preEncodeBuffer);
+            msgInner.setEncodedBuff(null);
+            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, 
msgIdSupplier,
                 msgInner.getStoreTimestamp(), queueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
 
             switch (tranType) {
@@ -1683,16 +1356,12 @@ public class CommitLog {
         }
 
         public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank,
-            final MessageExtBatch messageExtBatch) {
+            final MessageExtBatch messageExtBatch, PutMessageContext 
putMessageContext) {
             byteBuffer.mark();
             //physical offset
             long wroteOffset = fileFromOffset + byteBuffer.position();
             // Record ConsumeQueue information
-            keyBuilder.setLength(0);
-            keyBuilder.append(messageExtBatch.getTopic());
-            keyBuilder.append('-');
-            keyBuilder.append(messageExtBatch.getQueueId());
-            String key = keyBuilder.toString();
+            String key = putMessageContext.getTopicQueueTableKey();
             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
             if (null == queueOffset) {
                 queueOffset = 0L;
@@ -1701,17 +1370,35 @@ public class CommitLog {
             long beginQueueOffset = queueOffset;
             int totalMsgLen = 0;
             int msgNum = 0;
-            msgIdBuilder.setLength(0);
+
             final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
             ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
 
             int sysFlag = messageExtBatch.getSysFlag();
+            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) 
== 0 ? 4 + 4 : 16 + 4;
             int storeHostLength = (sysFlag & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
-            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+            Supplier<String> msgIdSupplier = () -> {
+                int msgIdLen = storeHostLength + 8;
+                int batchCount = putMessageContext.getBatchSize();
+                long[] phyPosArray = putMessageContext.getPhyPos();
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                
MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), 
msgIdBuffer);
+                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip 
the buffer
+
+                StringBuilder buffer = new StringBuilder(batchCount * msgIdLen 
* 2 + batchCount - 1);
+                for (int i = 0; i < phyPosArray.length; i++) {
+                    msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+                    String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+                    if (i != 0) {
+                        buffer.append(',');
+                    }
+                    buffer.append(msgId);
+                }
+                return buffer.toString();
+            };
 
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            ByteBuffer storeHostBytes = 
messageExtBatch.getStoreHostBytes(storeHostHolder);
             messagesByteBuff.mark();
+            int index = 0;
             while (messagesByteBuff.hasRemaining()) {
                 // 1 TOTALSIZE
                 final int msgPos = messagesByteBuff.position();
@@ -1726,7 +1413,7 @@ public class CommitLog {
                 totalMsgLen += msgLen;
                 // Determines whether there is sufficient free space
                 if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
-                    this.resetByteBuffer(this.msgStoreItemMemory, 8);
+                    this.msgStoreItemMemory.clear();
                     // 1 TOTALSIZE
                     this.msgStoreItemMemory.putInt(maxBlank);
                     // 2 MAGICCODE
@@ -1737,27 +1424,20 @@ public class CommitLog {
                     // Here the length of the specially set maxBlank
                     byteBuffer.reset(); //ignore the previous appended messages
                     byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
-                    return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, 
msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
+                    return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, 
msgIdSupplier, messageExtBatch.getStoreTimestamp(),
                         beginQueueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                 }
                 //move to add queue offset and commitlog offset
-                messagesByteBuff.position(msgPos + 20);
-                messagesByteBuff.putLong(queueOffset);
-                messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
-
-                storeHostBytes.rewind();
-                String msgId;
-                if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
-                    msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
storeHostBytes, wroteOffset + totalMsgLen - msgLen);
-                } else {
-                    msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, 
storeHostBytes, wroteOffset + totalMsgLen - msgLen);
-                }
-
-                if (msgIdBuilder.length() > 0) {
-                    msgIdBuilder.append(',').append(msgId);
-                } else {
-                    msgIdBuilder.append(msgId);
-                }
+                int pos = msgPos + 20;
+                messagesByteBuff.putLong(pos, queueOffset);
+                pos += 8;
+                messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - 
msgLen);
+                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+                pos += 8 + 4 + 8 + bornHostLength;
+                // refresh store time stamp in lock
+                messagesByteBuff.putLong(pos, 
messageExtBatch.getStoreTimestamp());
+
+                putMessageContext.getPhyPos()[index++] = wroteOffset + 
totalMsgLen - msgLen;
                 queueOffset++;
                 msgNum++;
                 messagesByteBuff.position(msgPos + msgLen);
@@ -1767,7 +1447,7 @@ public class CommitLog {
             messagesByteBuff.limit(totalMsgLen);
             byteBuffer.put(messagesByteBuff);
             messageExtBatch.setEncodedBuff(null);
-            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, 
msgIdBuilder.toString(),
+            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, 
msgIdSupplier,
                 messageExtBatch.getStoreTimestamp(), beginQueueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
             result.setMsgNum(msgNum);
             CommitLog.this.topicQueueTable.put(key, queueOffset);
@@ -1782,19 +1462,104 @@ public class CommitLog {
 
     }
 
-    public static class MessageExtBatchEncoder {
+    public static class MessageExtEncoder {
         // Store the message content
-        private final ByteBuffer msgBatchMemory;
+        private final ByteBuffer encoderBuffer;
         // The maximum length of the message
         private final int maxMessageSize;
 
-        MessageExtBatchEncoder(final int size) {
-            this.msgBatchMemory = ByteBuffer.allocateDirect(size);
+        MessageExtEncoder(final int size) {
+            this.encoderBuffer = ByteBuffer.allocateDirect(size);
             this.maxMessageSize = size;
         }
 
-        public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
-            msgBatchMemory.clear(); //not thread-safe
+        private void socketAddress2ByteBuffer(final SocketAddress 
socketAddress, final ByteBuffer byteBuffer) {
+            InetSocketAddress inetSocketAddress = (InetSocketAddress) 
socketAddress;
+            InetAddress address = inetSocketAddress.getAddress();
+            if (address instanceof Inet4Address) {
+                byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 
4);
+            } else {
+                byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 
16);
+            }
+            byteBuffer.putInt(inetSocketAddress.getPort());
+        }
+
+        protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
+            /**
+             * Serialize message
+             */
+            final byte[] propertiesData =
+                    msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+            final int propertiesLength = propertiesData == null ? 0 : 
propertiesData.length;
+
+            if (propertiesLength > Short.MAX_VALUE) {
+                log.warn("putMessage message properties length too long. 
length={}", propertiesData.length);
+                return new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+            }
+
+            final byte[] topicData = 
msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+            final int topicLength = topicData.length;
+
+            final int bodyLength = msgInner.getBody() == null ? 0 : 
msgInner.getBody().length;
+
+            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, 
topicLength, propertiesLength);
+
+            // Exceeds the maximum message
+            if (msgLen > this.maxMessageSize) {
+                CommitLog.log.warn("message size exceeded, msg total size: " + 
msgLen + ", msg body size: " + bodyLength
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+            }
+
+            // Initialization of storage space
+            this.resetByteBuffer(encoderBuffer, msgLen);
+            // 1 TOTALSIZE
+            this.encoderBuffer.putInt(msgLen);
+            // 2 MAGICCODE
+            this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+            // 3 BODYCRC
+            this.encoderBuffer.putInt(msgInner.getBodyCRC());
+            // 4 QUEUEID
+            this.encoderBuffer.putInt(msgInner.getQueueId());
+            // 5 FLAG
+            this.encoderBuffer.putInt(msgInner.getFlag());
+            // 6 QUEUEOFFSET, need update later
+            this.encoderBuffer.putLong(0);
+            // 7 PHYSICALOFFSET, need update later
+            this.encoderBuffer.putLong(0);
+            // 8 SYSFLAG
+            this.encoderBuffer.putInt(msgInner.getSysFlag());
+            // 9 BORNTIMESTAMP
+            this.encoderBuffer.putLong(msgInner.getBornTimestamp());
+            // 10 BORNHOST
+            socketAddress2ByteBuffer(msgInner.getBornHost() 
,this.encoderBuffer);
+            // 11 STORETIMESTAMP
+            this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
+            // 12 STOREHOSTADDRESS
+            socketAddress2ByteBuffer(msgInner.getStoreHost() 
,this.encoderBuffer);
+            // 13 RECONSUMETIMES
+            this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
+            // 14 Prepared Transaction Offset
+            
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
+            // 15 BODY
+            this.encoderBuffer.putInt(bodyLength);
+            if (bodyLength > 0)
+                this.encoderBuffer.put(msgInner.getBody());
+            // 16 TOPIC
+            this.encoderBuffer.put((byte) topicLength);
+            this.encoderBuffer.put(topicData);
+            // 17 PROPERTIES
+            this.encoderBuffer.putShort((short) propertiesLength);
+            if (propertiesLength > 0)
+                this.encoderBuffer.put(propertiesData);
+
+            encoderBuffer.flip();
+            return null;
+        }
+
+        protected ByteBuffer encode(final MessageExtBatch messageExtBatch, 
PutMessageContext putMessageContext) {
+            encoderBuffer.clear(); //not thread-safe
             int totalMsgLen = 0;
             ByteBuffer messagesByteBuff = messageExtBatch.wrap();
 
@@ -1809,7 +1574,9 @@ public class CommitLog {
             final byte[] batchPropData = 
batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
             final short batchPropLen = (short) batchPropData.length;
 
+            int batchSize = 0;
             while (messagesByteBuff.hasRemaining()) {
+                batchSize++;
                 // 1 TOTALSIZE
                 messagesByteBuff.getInt();
                 // 2 MAGICCODE
@@ -1849,53 +1616,55 @@ public class CommitLog {
                 }
 
                 // 1 TOTALSIZE
-                this.msgBatchMemory.putInt(msgLen);
+                this.encoderBuffer.putInt(msgLen);
                 // 2 MAGICCODE
-                this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+                this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
                 // 3 BODYCRC
-                this.msgBatchMemory.putInt(bodyCrc);
+                this.encoderBuffer.putInt(bodyCrc);
                 // 4 QUEUEID
-                this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
+                this.encoderBuffer.putInt(messageExtBatch.getQueueId());
                 // 5 FLAG
-                this.msgBatchMemory.putInt(flag);
+                this.encoderBuffer.putInt(flag);
                 // 6 QUEUEOFFSET
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 7 PHYSICALOFFSET
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 8 SYSFLAG
-                this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
+                this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
                 // 9 BORNTIMESTAMP
-                
this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
+                this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
                 // 10 BORNHOST
                 this.resetByteBuffer(bornHostHolder, bornHostLength);
-                
this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
+                
this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
                 // 11 STORETIMESTAMP
-                
this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
+                
this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
                 // 12 STOREHOSTADDRESS
                 this.resetByteBuffer(storeHostHolder, storeHostLength);
-                
this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
+                
this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
                 // 13 RECONSUMETIMES
-                
this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
+                this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
                 // 14 Prepared Transaction Offset, batch does not support 
transaction
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 15 BODY
-                this.msgBatchMemory.putInt(bodyLen);
+                this.encoderBuffer.putInt(bodyLen);
                 if (bodyLen > 0)
-                    this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, 
bodyLen);
+                    this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, 
bodyLen);
                 // 16 TOPIC
-                this.msgBatchMemory.put((byte) topicLength);
-                this.msgBatchMemory.put(topicData);
+                this.encoderBuffer.put((byte) topicLength);
+                this.encoderBuffer.put(topicData);
                 // 17 PROPERTIES
-                this.msgBatchMemory.putShort((short) (propertiesLen + 
batchPropLen));
+                this.encoderBuffer.putShort((short) (propertiesLen + 
batchPropLen));
                 if (propertiesLen > 0) {
-                    this.msgBatchMemory.put(messagesByteBuff.array(), 
propertiesPos, propertiesLen);
+                    this.encoderBuffer.put(messagesByteBuff.array(), 
propertiesPos, propertiesLen);
                 }
                 if (batchPropLen > 0) {
-                    this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
+                    this.encoderBuffer.put(batchPropData, 0, batchPropLen);
                 }
             }
-            msgBatchMemory.flip();
-            return msgBatchMemory;
+            putMessageContext.setBatchSize(batchSize);
+            putMessageContext.setPhyPos(new long[batchSize]);
+            encoderBuffer.flip();
+            return encoderBuffer;
         }
 
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int 
limit) {
@@ -1904,4 +1673,51 @@ public class CommitLog {
         }
 
     }
+
+    static class PutMessageThreadLocal {
+        private MessageExtEncoder encoder;
+        private StringBuilder keyBuilder;
+        PutMessageThreadLocal(int size) {
+            encoder = new MessageExtEncoder(size);
+            keyBuilder = new StringBuilder();
+        }
+
+        public MessageExtEncoder getEncoder() {
+            return encoder;
+        }
+
+        public StringBuilder getKeyBuilder() {
+            return keyBuilder;
+        }
+    }
+
+    static class PutMessageContext {
+        private String topicQueueTableKey;
+        private long[] phyPos;
+        private int batchSize;
+
+        public PutMessageContext(String topicQueueTableKey) {
+            this.topicQueueTableKey = topicQueueTableKey;
+        }
+
+        public String getTopicQueueTableKey() {
+            return topicQueueTableKey;
+        }
+
+        public long[] getPhyPos() {
+            return phyPos;
+        }
+
+        public void setPhyPos(long[] phyPos) {
+            this.phyPos = phyPos;
+        }
+
+        public int getBatchSize() {
+            return batchSize;
+        }
+
+        public void setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+        }
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7dd5a32..69019c1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
-        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
-            return new PutMessageResult(checkStoreStatus, null);
-        }
-
-        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
-        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
-            return new PutMessageResult(msgCheckStatus, null);
-        }
-
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessage(msg);
-        long elapsedTime = this.getSystemClock().now() - beginTime;
-        if (elapsedTime > 500) {
-            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", 
elapsedTime, msg.getBody().length);
-        }
-
-        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-        if (null == result || !result.isOk()) {
-            
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        try {
+            return asyncPutMessage(msg).get();
+        } catch (InterruptedException | ExecutionException e) {
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
-
-        return result;
     }
 
     @Override
     public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
-        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
-        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
-            return new PutMessageResult(checkStoreStatus, null);
-        }
-
-        PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
-        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
-            return new PutMessageResult(msgCheckStatus, null);
-        }
-
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
-        long elapsedTime = this.getSystemClock().now() - beginTime;
-        if (elapsedTime > 500) {
-            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", 
elapsedTime, messageExtBatch.getBody().length);
-        }
-
-        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-        if (null == result || !result.isOk()) {
-            
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        try {
+            return asyncPutMessages(messageExtBatch).get();
+        } catch (InterruptedException | ExecutionException e) {
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
-
-        return result;
     }
 
     @Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 25f0e39..297271d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.util.LibC;
 import sun.nio.ch.DirectBuffer;
@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource {
         return fileChannel;
     }
 
-    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, 
final AppendMessageCallback cb) {
-        return appendMessagesInner(msg, cb);
+    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, 
final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
+        return appendMessagesInner(msg, cb, putMessageContext);
     }
 
-    public AppendMessageResult appendMessages(final MessageExtBatch 
messageExtBatch, final AppendMessageCallback cb) {
-        return appendMessagesInner(messageExtBatch, cb);
+    public AppendMessageResult appendMessages(final MessageExtBatch 
messageExtBatch, final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
+        return appendMessagesInner(messageExtBatch, cb, putMessageContext);
     }
 
-    public AppendMessageResult appendMessagesInner(final MessageExt 
messageExt, final AppendMessageCallback cb) {
+    public AppendMessageResult appendMessagesInner(final MessageExt 
messageExt, final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
         assert messageExt != null;
         assert cb != null;
 
@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource {
             byteBuffer.position(currentPos);
             AppendMessageResult result;
             if (messageExt instanceof MessageExtBrokerInner) {
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos,
+                        (MessageExtBrokerInner) messageExt, putMessageContext);
             } else if (messageExt instanceof MessageExtBatch) {
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos, (MessageExtBatch) messageExt);
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos,
+                        (MessageExtBatch) messageExt, putMessageContext);
             } else {
                 return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
             }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
index e5f087b..df7e6e5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.nio.ByteBuffer;
+
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.message.MessageExt;
 
@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt {
     private String propertiesString;
     private long tagsCode;
 
+    private ByteBuffer encodedBuff;
+
+    public ByteBuffer getEncodedBuff() {
+        return encodedBuff;
+    }
+
+    public void setEncodedBuff(ByteBuffer encodedBuff) {
+        this.encodedBuff = encodedBuff;
+    }
+
     public static long tagsString2tagsCode(final TopicFilterType filter, final 
String tags) {
         if (null == tags || tags.length() == 0) { return 0; }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index ea791bd..011cbe1 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -414,237 +413,6 @@ public class DLedgerCommitLog extends CommitLog {
     }
 
     @Override
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-
-        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
-        final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
-        String topic = msg.getTopic();
-        setMessageInfo(msg,tranType);
-
-        // Back to Results
-        AppendMessageResult appendResult;
-        AppendFuture<AppendEntryResponse> dledgerFuture;
-        EncodeResult encodeResult;
-
-        encodeResult = this.messageSerializer.serialize(msg);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(encodeResult.status));
-        }
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
-        long elapsedTimeInLock;
-        long queueOffset;
-        try {
-            beginTimeInDledgerLock = 
this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, 
tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, false);
-            AppendEntryRequest request = new AppendEntryRequest();
-            request.setGroup(dLedgerConfig.getGroup());
-            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
-            request.setBody(encodeResult.getData());
-            dledgerFuture = (AppendFuture<AppendEntryResponse>) 
dLedgerServer.handleAppend(request);
-            if (dledgerFuture.getPos() == -1) {
-                return new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-            }
-            long wroteOffset = dledgerFuture.getPos() + 
DLedgerEntry.BODY_OFFSET;
-
-            int msgIdLength = (msg.getSysFlag() & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
-            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
-            String msgId = MessageDecoder.createMessageId(buffer, 
msg.getStoreHostBytes(), wroteOffset);
-            elapsedTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
-            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, 
wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), 
queueOffset, elapsedTimeInLock);
-            switch (tranType) {
-                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
-                    break;
-                case MessageSysFlag.TRANSACTION_NOT_TYPE:
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
-                    // The next update ConsumeQueue information
-                    
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, 
queueOffset + 1);
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            log.error("Put message error", e);
-            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-        } finally {
-            beginTimeInDledgerLock = 0;
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, 
appendResult);
-        }
-
-        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
-        try {
-            AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, 
TimeUnit.SECONDS);
-            switch 
(DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
-                case SUCCESS:
-                    putMessageStatus = PutMessageStatus.PUT_OK;
-                    break;
-                case INCONSISTENT_LEADER:
-                case NOT_LEADER:
-                case LEADER_NOT_READY:
-                case DISK_FULL:
-                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
-                    break;
-                case WAIT_QUORUM_ACK_TIMEOUT:
-                    //Do not return flush_slave_timeout to the client, for the 
ons client will ignore it.
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-                case LEADER_PENDING_FULL:
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-            }
-        } catch (Throwable t) {
-            log.error("Failed to get dledger append result", t);
-        }
-
-        PutMessageResult putMessageResult = new 
PutMessageResult(putMessageStatus, appendResult);
-        if (putMessageStatus == PutMessageStatus.PUT_OK) {
-            // Statistics
-            
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
-            
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
-        }
-        return putMessageResult;
-    }
-
-    @Override
-    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) 
{
-        final int tranType = 
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
-        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
-        }
-        if (messageExtBatch.getDelayTimeLevel() > 0) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
-        }
-
-        // Set the storage time
-        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
-
-        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) 
messageExtBatch.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) 
messageExtBatch.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setStoreHostAddressV6Flag();
-        }
-
-        // Back to Results
-        AppendMessageResult appendResult;
-        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
-        EncodeResult encodeResult;
-
-        encodeResult = this.messageSerializer.serialize(messageExtBatch);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(encodeResult
-                    .status));
-        }
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
-        msgIdBuilder.setLength(0);
-        long elapsedTimeInLock;
-        long queueOffset;
-        int msgNum = 0;
-        try {
-            beginTimeInDledgerLock = 
this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, 
tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, true);
-            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
-            request.setGroup(dLedgerConfig.getGroup());
-            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
-            request.setBatchMsgs(encodeResult.batchData);
-            AppendFuture<AppendEntryResponse> appendFuture = 
(AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
-            if (appendFuture.getPos() == -1) {
-                log.warn("HandleAppend return false due to error code {}", 
appendFuture.get().getCode());
-                return new 
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-            }
-            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) 
appendFuture;
-
-            long wroteOffset = 0;
-
-            int msgIdLength = (messageExtBatch.getSysFlag() & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
-            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
-            boolean isFirstOffset = true;
-            long firstWroteOffset = 0;
-            for (long pos : dledgerFuture.getPositions()) {
-                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
-                if (isFirstOffset) {
-                    firstWroteOffset = wroteOffset;
-                    isFirstOffset = false;
-                }
-                String msgId = MessageDecoder.createMessageId(buffer, 
messageExtBatch.getStoreHostBytes(), wroteOffset);
-                if (msgIdBuilder.length() > 0) {
-                    msgIdBuilder.append(',').append(msgId);
-                } else {
-                    msgIdBuilder.append(msgId);
-                }
-                msgNum++;
-            }
-
-            elapsedTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
-            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, 
firstWroteOffset, encodeResult.totalMsgLen,
-                    msgIdBuilder.toString(), System.currentTimeMillis(), 
queueOffset, elapsedTimeInLock);
-            appendResult.setMsgNum(msgNum);
-            
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, 
queueOffset + msgNum);
-        } catch (Exception e) {
-            log.error("Put message error", e);
-            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new 
AppendMessageResult(AppendMessageStatus
-                    .UNKNOWN_ERROR));
-        } finally {
-            beginTimeInDledgerLock = 0;
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}",
-                    elapsedTimeInLock, messageExtBatch.getBody().length, 
appendResult);
-        }
-
-        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
-        try {
-            AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, 
TimeUnit.SECONDS);
-            switch 
(DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
-                case SUCCESS:
-                    putMessageStatus = PutMessageStatus.PUT_OK;
-                    break;
-                case INCONSISTENT_LEADER:
-                case NOT_LEADER:
-                case LEADER_NOT_READY:
-                case DISK_FULL:
-                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
-                    break;
-                case WAIT_QUORUM_ACK_TIMEOUT:
-                    //Do not return flush_slave_timeout to the client, for the 
ons client will ignore it.
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-                case LEADER_PENDING_FULL:
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-            }
-        } catch (Throwable t) {
-            log.error("Failed to get dledger append result", t);
-        }
-
-        PutMessageResult putMessageResult = new 
PutMessageResult(putMessageStatus, appendResult);
-        if (putMessageStatus == PutMessageStatus.PUT_OK) {
-            // Statistics
-            
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
-            
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
-        }
-        return putMessageResult;
-    }
-
-    @Override
     public CompletableFuture<PutMessageResult> 
asyncPutMessage(MessageExtBrokerInner msg) {
 
         StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index f46b3be..715c9d3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -42,7 +44,7 @@ public class AppendCallbackTest {
 
     AppendMessageCallback callback;
 
-    CommitLog.MessageExtBatchEncoder batchEncoder = new 
CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
+    MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
 
     @Before
     public void init() throws Exception {
@@ -84,10 +86,12 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + 
"-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, 
putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
         //encounter end of file when append half of the data
-        AppendMessageResult result = callback.doAppend(0, buff, 1000, 
messageExtBatch);
+        AppendMessageResult result =
+                callback.doAppend(0, buff, 1000, messageExtBatch, 
putMessageContext);
         assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
         assertEquals(0, result.getWroteOffset());
         assertEquals(0, result.getLogicsOffset());
@@ -121,10 +125,12 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + 
"-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, 
putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
         //encounter end of file when append half of the data
-        AppendMessageResult result = callback.doAppend(0, buff, 1000, 
messageExtBatch);
+        AppendMessageResult result =
+                callback.doAppend(0, buff, 1000, messageExtBatch, 
putMessageContext);
         assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
         assertEquals(0, result.getWroteOffset());
         assertEquals(0, result.getLogicsOffset());
@@ -154,9 +160,11 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + 
"-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, 
putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
-        AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, 
messageExtBatch);
+        AppendMessageResult allresult =
+                callback.doAppend(0, buff, 1024 * 10, messageExtBatch, 
putMessageContext);
 
         assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
         assertEquals(0, allresult.getWroteOffset());
@@ -214,9 +222,11 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + 
"-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, 
putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
-        AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, 
messageExtBatch);
+        AppendMessageResult allresult =
+                callback.doAppend(0, buff, 1024 * 10, messageExtBatch, 
putMessageContext);
 
         assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
         assertEquals(0, allresult.getWroteOffset());

Reply via email to