Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 087d989fc -> 11653ce24


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b4bf298..d81672f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
@@ -57,6 +58,7 @@ public class CommitLog {
     private final FlushCommitLogService commitLogService;
 
     private final AppendMessageCallback appendMessageCallback;
+    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
     private HashMap<String/* topic-queueid */, Long/* offset */> 
topicQueueTable = new HashMap<String, Long>(1024);
     private volatile long confirmOffset = -1L;
 
@@ -81,6 +83,11 @@ public class CommitLog {
         this.commitLogService = new CommitRealTimeService();
 
         this.appendMessageCallback = new 
DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+        batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
+            @Override protected MessageExtBatchEncoder initialValue() {
+                return new 
MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+            }
+        };
     }
 
     public boolean load() {
@@ -222,7 +229,8 @@ public class CommitLog {
      *
      * @return 0 Come the end of the file // >0 Normal messages // -1 Message 
checksum failure
      */
-    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer 
byteBuffer, final boolean checkCRC, final boolean readBody) {
+    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer 
byteBuffer, final boolean checkCRC,
+        final boolean readBody) {
         try {
             // 1 TOTAL SIZE
             int totalSize = byteBuffer.getInt();
@@ -370,7 +378,7 @@ public class CommitLog {
         return new DispatchRequest(-1, false /* success */);
     }
 
-    private int calMsgLength(int bodyLength, int topicLength, int 
propertiesLength) {
+    private static int calMsgLength(int bodyLength, int topicLength, int 
propertiesLength) {
         final int msgLen = 4 // 1 TOTALSIZE
             + 4 // 2 MAGICCODE
             + 4 // 3 BODYCRC
@@ -633,18 +641,23 @@ public class CommitLog {
         
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
         
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
 
-        GroupCommitRequest request = null;
+        handleDiskFlush(result, putMessageResult, msg);
+        handleHA(result, putMessageResult, msg);
 
+        return putMessageResult;
+    }
+
+    public void handleDiskFlush(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
         // Synchronization flush
         if (FlushDiskType.SYNC_FLUSH == 
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
             final GroupCommitService service = (GroupCommitService) 
this.flushCommitLogService;
-            if (msg.isWaitStoreMsgOK()) {
-                request = new GroupCommitRequest(result.getWroteOffset() + 
result.getWroteBytes());
+            if (messageExt.isWaitStoreMsgOK()) {
+                GroupCommitRequest request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                 service.putRequest(request);
                 boolean flushOK = 
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                 if (!flushOK) {
-                    log.error("do groupcommit, wait for flush failed, topic: " 
+ msg.getTopic() + " tags: " + msg.getTags()
-                        + " client address: " + msg.getBornHostString());
+                    log.error("do groupcommit, wait for flush failed, topic: " 
+ messageExt.getTopic() + " tags: " + messageExt.getTags()
+                        + " client address: " + 
messageExt.getBornHostString());
                     
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                 }
             } else {
@@ -659,26 +672,22 @@ public class CommitLog {
                 commitLogService.wakeup();
             }
         }
+    }
 
-        // Synchronous write double
+    public void handleHA(AppendMessageResult result, PutMessageResult 
putMessageResult, MessageExt messageExt) {
         if (BrokerRole.SYNC_MASTER == 
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
             HAService service = this.defaultMessageStore.getHaService();
-            if (msg.isWaitStoreMsgOK()) {
+            if (messageExt.isWaitStoreMsgOK()) {
                 // Determine whether to wait
                 if (service.isSlaveOK(result.getWroteOffset() + 
result.getWroteBytes())) {
-                    if (null == request) {
-                        request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
-                    }
+                    GroupCommitRequest  request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                     service.putRequest(request);
-
                     service.getWaitNotifyObject().wakeupAll();
-
                     boolean flushOK =
-                        // TODO
                         
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                     if (!flushOK) {
-                        log.error("do sync transfer other node, wait return, 
but failed, topic: " + msg.getTopic() + " tags: "
-                            + msg.getTags() + " client address: " + 
msg.getBornHostString());
+                        log.error("do sync transfer other node, wait return, 
but failed, topic: " + messageExt.getTopic() + " tags: "
+                            + messageExt.getTags() + " client address: " + 
messageExt.getBornHostNameString());
                         
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                     }
                 }
@@ -690,12 +699,109 @@ public class CommitLog {
             }
         }
 
+    }
+
+    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) 
{
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+        AppendMessageResult result;
+
+        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
+
+        final int tranType = 
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+        }
+
+        long eclipseTimeInLock = 0;
+        MappedFile unlockMappedFile = null;
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+        //fine-grained lock instead of the coarse-grained
+        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
+
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+
+        lockForPutMessage(); //spin...
+        try {
+            long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
+            this.beginTimeInLock = beginLockTimestamp;
+
+            // Here settings are stored timestamp, in order to ensure an 
orderly
+            // global
+            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
+
+            if (null == mappedFile || mappedFile.isFull()) {
+                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
+            }
+            if (null == mappedFile) {
+                log.error("Create maped file1 error, topic: {} clientAddr: 
{}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
+                beginTimeInLock = 0;
+                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+            }
+
+            result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
+            switch (result.getStatus()) {
+                case PUT_OK:
+                    break;
+                case END_OF_FILE:
+                    unlockMappedFile = mappedFile;
+                    // Create a new file, re-write the message
+                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+                    if (null == mappedFile) {
+                        // XXX: warn and notify me
+                        log.error("Create maped file2 error, topic: {} 
clientAddr: {}", messageExtBatch.getTopic(), 
messageExtBatch.getBornHostString());
+                        beginTimeInLock = 0;
+                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+                    }
+                    result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback);
+                    break;
+                case MESSAGE_SIZE_EXCEEDED:
+                case PROPERTIES_SIZE_EXCEEDED:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+                case UNKNOWN_ERROR:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                default:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+            }
+
+            eclipseTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+            beginTimeInLock = 0;
+        } finally {
+            releasePutMessageLock();
+        }
+
+
+        if (eclipseTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", eclipseTimeInLock, 
messageExtBatch.getBody().length, result);
+        }
+
+        if (null != unlockMappedFile && 
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
+        }
+
+        PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
+
+        // Statistics
+        
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
+        
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
+
+
+        handleDiskFlush(result, putMessageResult, messageExtBatch);
+
+        handleHA(result, putMessageResult, messageExtBatch);
+
         return putMessageResult;
     }
 
     /**
-     * According to receive certain message or offset storage time if an error
-     * occurs, it returns -1
+     * According to receive certain message or offset storage time if an error 
occurs, it returns -1
      */
     public long pickupStoreTimestamp(final long offset, final int size) {
         if (offset >= this.getMinOffset()) {
@@ -1096,6 +1202,8 @@ public class CommitLog {
         // Build Message Key
         private final StringBuilder keyBuilder = new StringBuilder();
 
+        private final StringBuilder msgIdBuilder = new StringBuilder();
+
         private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
 
         DefaultAppendMessageCallback(final int size) {
@@ -1108,7 +1216,8 @@ public class CommitLog {
             return msgStoreItemMemory;
         }
 
-        public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner 
msgInner) {
+        public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank,
+            final MessageExtBrokerInner msgInner) {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
             // PHY OFFSET
@@ -1216,7 +1325,7 @@ public class CommitLog {
             // 12 STOREHOSTADDRESS
             this.resetByteBuffer(hostHolder, 8);
             
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
-            //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+            //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
             // 13 RECONSUMETIMES
             this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
             // 14 Prepared Transaction Offset
@@ -1255,9 +1364,202 @@ public class CommitLog {
             return result;
         }
 
+        public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank,
+            final MessageExtBatch messageExtBatch) {
+            byteBuffer.mark();
+            //physical offset
+            long wroteOffset = fileFromOffset + byteBuffer.position();
+            // Record ConsumeQueue information
+            keyBuilder.setLength(0);
+            keyBuilder.append(messageExtBatch.getTopic());
+            keyBuilder.append('-');
+            keyBuilder.append(messageExtBatch.getQueueId());
+            String key = keyBuilder.toString();
+            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+                CommitLog.this.topicQueueTable.put(key, queueOffset);
+            }
+            long beginQueueOffset = queueOffset;
+            int totalMsgLen = 0;
+            int msgNum = 0;
+            msgIdBuilder.setLength(0);
+            final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
+            ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
+            this.resetByteBuffer(hostHolder, 8);
+            ByteBuffer storeHostBytes = 
messageExtBatch.getStoreHostBytes(hostHolder);
+            messagesByteBuff.mark();
+            while (messagesByteBuff.hasRemaining()) {
+                // 1 TOTALSIZE
+                final int msgPos = messagesByteBuff.position();
+                final int msgLen = messagesByteBuff.getInt();
+                final int bodyLen = msgLen - 40; //only for log, just estimate 
it
+                // Exceeds the maximum message
+                if (msgLen > this.maxMessageSize) {
+                    CommitLog.log.warn("message size exceeded, msg total size: 
" + msgLen + ", msg body size: " + bodyLen
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                    return new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+                }
+                totalMsgLen += msgLen;
+                // Determines whether there is sufficient free space
+                if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
+                    this.resetByteBuffer(this.msgStoreItemMemory, 8);
+                    // 1 TOTALSIZE
+                    this.msgStoreItemMemory.putInt(maxBlank);
+                    // 2 MAGICCODE
+                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+                    // 3 The remaining space may be any value
+                    //
+                    //ignore previous read
+                    messagesByteBuff.reset();
+                    // Here the length of the specially set maxBlank
+                    byteBuffer.reset(); //ignore the previous appended messages
+                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+                    return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, 
msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
+                        beginQueueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+                }
+                //move to add queue offset and commitlog offset
+                messagesByteBuff.position(msgPos + 20);
+                messagesByteBuff.putLong(queueOffset);
+                messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
+
+                storeHostBytes.rewind();
+                String msgId = 
MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + 
totalMsgLen - msgLen);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                queueOffset++;
+                msgNum++;
+                messagesByteBuff.position(msgPos + msgLen);
+            }
+
+            messagesByteBuff.position(0);
+            messagesByteBuff.limit(totalMsgLen);
+            byteBuffer.put(messagesByteBuff);
+            messageExtBatch.setEncodedBuff(null);
+            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, 
msgIdBuilder.toString(),
+                messageExtBatch.getStoreTimestamp(), beginQueueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            result.setMsgNum(msgNum);
+            CommitLog.this.topicQueueTable.put(key, queueOffset);
+
+            return result;
+        }
+
+        private void resetByteBuffer(final ByteBuffer byteBuffer, final int 
limit) {
+            byteBuffer.flip();
+            byteBuffer.limit(limit);
+        }
+
+    }
+
+    public static class MessageExtBatchEncoder {
+        // Store the message content
+        private final ByteBuffer msgBatchMemory;
+        // The maximum length of the message
+        private final int maxMessageSize;
+
+        private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
+
+        MessageExtBatchEncoder(final int size) {
+            this.msgBatchMemory = ByteBuffer.allocateDirect(size);
+            this.maxMessageSize = size;
+        }
+
+
+        public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
+            msgBatchMemory.clear(); //not thread-safe
+            int totalMsgLen = 0;
+            ByteBuffer messagesByteBuff = messageExtBatch.wrap();
+            while (messagesByteBuff.hasRemaining()) {
+                // 1 TOTALSIZE
+                messagesByteBuff.getInt();
+                // 2 MAGICCODE
+                messagesByteBuff.getInt();
+                // 3 BODYCRC
+                messagesByteBuff.getInt();
+                // 4 FLAG
+                int flag = messagesByteBuff.getInt();
+                // 5 BODY
+                int bodyLen = messagesByteBuff.getInt();
+                int bodyPos = messagesByteBuff.position();
+                int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, 
bodyLen);
+                messagesByteBuff.position(bodyPos + bodyLen);
+                // 6 properties
+                short propertiesLen = messagesByteBuff.getShort();
+                int propertiesPos = messagesByteBuff.position();
+                messagesByteBuff.position(propertiesPos + propertiesLen);
+
+                final byte[] topicData = 
messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+
+                final int topicLength = topicData.length;
+
+                final int msgLen = calMsgLength(bodyLen, topicLength, 
propertiesLen);
+
+                // Exceeds the maximum message
+                if (msgLen > this.maxMessageSize) {
+                    CommitLog.log.warn("message size exceeded, msg total size: 
" + msgLen + ", msg body size: " + bodyLen
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                    throw new RuntimeException("message size exceeded");
+                }
+
+                totalMsgLen += msgLen;
+                // Determines whether there is sufficient free space
+                if (totalMsgLen > maxMessageSize) {
+                    throw new RuntimeException("message size exceeded");
+                }
+
+                // 1 TOTALSIZE
+                this.msgBatchMemory.putInt(msgLen);
+                // 2 MAGICCODE
+                this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+                // 3 BODYCRC
+                this.msgBatchMemory.putInt(bodyCrc);
+                // 4 QUEUEID
+                this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
+                // 5 FLAG
+                this.msgBatchMemory.putInt(flag);
+                // 6 QUEUEOFFSET
+                this.msgBatchMemory.putLong(0);
+                // 7 PHYSICALOFFSET
+                this.msgBatchMemory.putLong(0);
+                // 8 SYSFLAG
+                this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
+                // 9 BORNTIMESTAMP
+                
this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
+                // 10 BORNHOST
+                this.resetByteBuffer(hostHolder, 8);
+                
this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(hostHolder));
+                // 11 STORETIMESTAMP
+                
this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
+                // 12 STOREHOSTADDRESS
+                this.resetByteBuffer(hostHolder, 8);
+                
this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(hostHolder));
+                // 13 RECONSUMETIMES
+                
this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
+                // 14 Prepared Transaction Offset, batch does not support 
transaction
+                this.msgBatchMemory.putLong(0);
+                // 15 BODY
+                this.msgBatchMemory.putInt(bodyLen);
+                if (bodyLen > 0)
+                    this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, 
bodyLen);
+                // 16 TOPIC
+                this.msgBatchMemory.put((byte) topicLength);
+                this.msgBatchMemory.put(topicData);
+                // 17 PROPERTIES
+                this.msgBatchMemory.putShort(propertiesLen);
+                if (propertiesLen > 0)
+                    this.msgBatchMemory.put(messagesByteBuff.array(), 
propertiesPos, propertiesLen);
+            }
+            msgBatchMemory.flip();
+            return msgBatchMemory;
+        }
+
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int 
limit) {
             byteBuffer.flip();
             byteBuffer.limit(limit);
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index dc1c96c..eeb3598 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -331,7 +331,7 @@ public class ConsumeQueue {
     public void putMessagePositionInfoWrapper(long offset, int size, long 
tagsCode, long storeTimestamp,
         long logicOffset) {
         final int maxRetries = 30;
-        boolean canWrite = 
this.defaultMessageStore.getRunningFlags().isWriteable();
+        boolean canWrite = 
this.defaultMessageStore.getRunningFlags().isCQWriteable();
         for (int i = 0; i < maxRetries && canWrite; i++) {
             boolean result = this.putMessagePositionInfo(offset, size, 
tagsCode, logicOffset);
             if (result) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2594ef3..5c2d27f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.running.RunningStats;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
@@ -325,6 +326,62 @@ public class DefaultMessageStore implements MessageStore {
         return result;
     }
 
+    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+        if (this.shutdown) {
+            log.warn("DefaultMessageStore has shutdown, so putMessages is 
forbidden");
+            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("DefaultMessageStore is in slave mode, so putMessages 
is forbidden ");
+            }
+
+            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (!this.runningFlags.isWriteable()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("DefaultMessageStore is not writable, so putMessages 
is forbidden " + this.runningFlags.getFlagBits());
+            }
+
+            return new 
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        } else {
+            this.printTimes.set(0);
+        }
+
+        if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("PutMessages topic length too long " + 
messageExtBatch.getTopic().length());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+        }
+
+        if (messageExtBatch.getBody().length > 
messageStoreConfig.getMaxMessageSize()) {
+            log.warn("PutMessages body length too long " + 
messageExtBatch.getBody().length);
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+        }
+
+        if (this.isOSPageCacheBusy()) {
+            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, 
null);
+        }
+
+        long beginTime = this.getSystemClock().now();
+        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
+
+        long eclipseTime = this.getSystemClock().now() - beginTime;
+        if (eclipseTime > 500) {
+            log.warn("not in lock eclipse time(ms)={}, bodyLength={}", 
eclipseTime, messageExtBatch.getBody().length);
+        }
+        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+
+        if (null == result || !result.isOk()) {
+            
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        }
+
+        return result;
+    }
+
     @Override
     public boolean isOSPageCacheBusy() {
         long begin = this.getCommitLog().getBeginTimeInLock();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index feb505d..5cb72ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.util.LibC;
 import org.slf4j.Logger;
@@ -187,7 +189,15 @@ public class MappedFile extends ReferenceResource {
     }
 
     public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, 
final AppendMessageCallback cb) {
-        assert msg != null;
+        return appendMessagesInner(msg, cb);
+    }
+
+    public AppendMessageResult appendMessages(final MessageExtBatch 
messageExtBatch, final AppendMessageCallback cb) {
+        return appendMessagesInner(messageExtBatch, cb);
+    }
+
+    public AppendMessageResult appendMessagesInner(final MessageExt 
messageExt, final AppendMessageCallback cb) {
+        assert messageExt != null;
         assert cb != null;
 
         int currentPos = this.wrotePosition.get();
@@ -195,30 +205,28 @@ public class MappedFile extends ReferenceResource {
         if (currentPos < this.fileSize) {
             ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() 
: this.mappedByteBuffer.slice();
             byteBuffer.position(currentPos);
-            AppendMessageResult result =
-                cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos, msg);
+            AppendMessageResult result = null;
+            if (messageExt instanceof MessageExtBrokerInner) {
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
+            } else if (messageExt instanceof MessageExtBatch) {
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos, (MessageExtBatch)messageExt);
+            } else {
+                return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+            }
             this.wrotePosition.addAndGet(result.getWroteBytes());
             this.storeTimestamp = result.getStoreTimestamp();
             return result;
         }
-
-        log.error("MappedFile.appendMessage return null, wrotePosition: " + 
currentPos + " fileSize: "
-            + this.fileSize);
+        log.error("MappedFile.appendMessage return null, wrotePosition: {} 
fileSize: {}", currentPos,  this.fileSize);
         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
     }
 
-    /**
 
-     */
     public long getFileFromOffset() {
         return this.fileFromOffset;
     }
 
-    /**
-
-     *
 
-     */
     public boolean appendMessage(final byte[] data) {
         int currentPos = this.wrotePosition.get();
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 79e3a8f..65c546b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
 import java.util.HashMap;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public interface MessageStore {
@@ -33,6 +34,8 @@ public interface MessageStore {
 
     PutMessageResult putMessage(final MessageExtBrokerInner msg);
 
+    PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
+
     GetMessageResult getMessage(final String group, final String topic, final 
int queueId,
         final long offset, final int maxMsgNums, final SubscriptionData 
subscriptionData);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java 
b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 4f610b8..3dcd861 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -27,6 +27,8 @@ public class RunningFlags {
     private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
 
     private static final int DISK_FULL_BIT = 1 << 4;
+
+
     private volatile int flagBits = 0;
 
     public RunningFlags() {
@@ -76,6 +78,15 @@ public class RunningFlags {
         return false;
     }
 
+    //for consume queue, just ignore the DISK_FULL_BIT
+    public boolean isCQWriteable() {
+        if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT 
| WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
+            return true;
+        }
+
+        return false;
+    }
+
     public boolean getAndMakeNotWriteable() {
         boolean result = this.isWriteable();
         if (result) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9cfd1c3..7ae2ab5 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -132,6 +132,7 @@ public class MessageStoreConfig {
     private int transientStorePoolSize = 5;
     private boolean fastFailIfNoBufferInStorePool = false;
 
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -629,4 +630,5 @@ public class MessageStoreConfig {
     public void setCommitCommitLogThoroughInterval(final int 
commitCommitLogThoroughInterval) {
         this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 29ec71e..1515eb4 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -123,7 +123,9 @@ public class BrokerStatsManager {
     public void incTopicPutNums(final String topic) {
         this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
     }
-
+    public void incTopicPutNums(final String topic, int num, int times) {
+        this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
+    }
     public void incTopicPutSize(final String topic, final int size) {
         this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
     }
@@ -154,7 +156,9 @@ public class BrokerStatsManager {
     public void incBrokerPutNums() {
         
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
     }
-
+    public void incBrokerPutNums(final int incValue) {
+        
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+    }
     public void incBrokerGetNums(final int incValue) {
         
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
new file mode 100644
index 0000000..fc667b6
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class AppendCallbackTest {
+
+    AppendMessageCallback callback;
+
+    CommitLog.MessageExtBatchEncoder batchEncoder = new 
CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
+
+    @Before
+    public void init() throws Exception{
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(100 * 10);
+        messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") 
+ File.separator + "unitteststore");
+        
messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + 
File.separator + "unitteststore" + File.separator + "commitlog");
+        //too much reference
+        DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, null);
+        CommitLog commitLog = new CommitLog(messageStore);
+        callback = commitLog.new DefaultAppendMessageCallback(1024);
+    }
+
+
+    @Test
+    public void testAppendMessageBatchEndOfFile() throws Exception{
+        List<Message>  messages = new ArrayList<>();
+        String topic = "test-topic";
+        int queue= 0;
+        for (int i = 0; i < 10; i++) {
+            Message msg = new Message();
+            msg.setBody("body".getBytes());
+            msg.setTopic(topic);
+            msg.setTags("abc");
+            messages.add(msg);
+        }
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic(topic);
+        messageExtBatch.setQueueId(queue);
+        messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+        messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
+        messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
+        messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
+
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+        //encounter end of file when append half of the data
+        AppendMessageResult result = callback.doAppend(0, buff, 1000, 
messageExtBatch);
+        assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
+        assertEquals(0, result.getWroteOffset());
+        assertEquals(0, result.getLogicsOffset());
+        assertEquals(1000, result.getWroteBytes());
+        assertEquals(8, buff.position()); //write blank size and magic value
+
+        assertTrue(result.getMsgId().length() > 0); //should have already 
constructed some message ids
+    }
+    @Test
+    public void testAppendMessageBatchSucc() throws Exception {
+        List<Message>  messages = new ArrayList<>();
+        String topic = "test-topic";
+        int queue= 0;
+        for (int i = 0; i < 10; i++) {
+            Message msg = new Message();
+            msg.setBody("body".getBytes());
+            msg.setTopic(topic);
+            msg.setTags("abc");
+            messages.add(msg);
+        }
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic(topic);
+        messageExtBatch.setQueueId(queue);
+        messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+        messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
+        messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
+        messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
+
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+        AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, 
messageExtBatch);
+
+        assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+        assertEquals(0, allresult.getWroteOffset());
+        assertEquals(0, allresult.getLogicsOffset());
+        assertEquals(buff.position(), allresult.getWroteBytes());
+
+        assertEquals(messages.size(), allresult.getMsgNum());
+
+        Set<String> msgIds = new HashSet<>();
+        for (String msgId: allresult.getMsgId().split(",")) {
+            assertEquals(32, msgId.length());
+            msgIds.add(msgId);
+        }
+        assertEquals(messages.size(), msgIds.size());
+
+        List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) 
buff.flip());
+        assertEquals(decodeMsgs.size(), decodeMsgs.size());
+        long queueOffset = decodeMsgs.get(0).getQueueOffset();
+        long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
+        for (int i = 0; i < messages.size(); i++) {
+            assertEquals(messages.get(i).getTopic(), 
decodeMsgs.get(i).getTopic());
+            assertEquals(new String(messages.get(i).getBody()), new 
String(decodeMsgs.get(i).getBody()));
+            assertEquals(messages.get(i).getTags(), 
decodeMsgs.get(i).getTags());
+
+            assertEquals(messageExtBatch.getBornHostNameString(), 
decodeMsgs.get(i).getBornHostNameString());
+
+            assertEquals(messageExtBatch.getBornTimestamp(), 
decodeMsgs.get(i).getBornTimestamp());
+            assertEquals(storeTimeStamp, 
decodeMsgs.get(i).getStoreTimestamp());
+            assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
new file mode 100644
index 0000000..e372a1b
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.batch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.factory.ProducerFactory;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BatchSendIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private String topic = null;
+    private Random random = new Random();
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testBatchSend_ViewMessage() throws Exception {
+        List<Message> messageList = new ArrayList<>();
+        int batchNum = 100;
+        for (int i = 0; i < batchNum; i++) {
+            messageList.add(new Message(topic, 
RandomUtils.getStringByUUID().getBytes()));
+        }
+
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        SendResult sendResult = producer.send(messageList);
+        Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
+
+        String[] offsetIds = sendResult.getOffsetMsgId().split(",");
+        String[] msgIds = sendResult.getMsgId().split(",");
+        Assert.assertEquals(messageList.size(), offsetIds.length);
+        Assert.assertEquals(messageList.size(), msgIds.length);
+
+        Thread.sleep(2000);
+
+        for (int i = 0; i < 3; i++) {
+            producer.viewMessage(offsetIds[random.nextInt(batchNum)]);
+        }
+        for (int i = 0; i < 3; i++) {
+            producer.viewMessage(topic, msgIds[random.nextInt(batchNum)]);
+        }
+    }
+
+
+    @Test
+    public void testBatchSend_CheckProperties() throws Exception {
+        List<Message> messageList = new ArrayList<>();
+        Message message = new Message();
+        message.setTopic(topic);
+        message.setKeys("keys123");
+        message.setTags("tags123");
+        message.setWaitStoreMsgOK(false);
+        message.setBuyerId("buyerid123");
+        message.setFlag(123);
+        message.setBody("body".getBytes());
+        messageList.add(message);
+
+
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        SendResult sendResult = producer.send(messageList);
+        Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
+
+        String[] offsetIds = sendResult.getOffsetMsgId().split(",");
+        String[] msgIds = sendResult.getMsgId().split(",");
+        Assert.assertEquals(messageList.size(), offsetIds.length);
+        Assert.assertEquals(messageList.size(), msgIds.length);
+
+        Thread.sleep(2000);
+
+        Message messageByOffset = producer.viewMessage(offsetIds[0]);
+        Message messageByMsgId = producer.viewMessage(topic, msgIds[0]);
+
+        System.out.println(messageByOffset);
+        System.out.println(messageByMsgId);
+
+        Assert.assertEquals(message.getTopic(), messageByMsgId.getTopic());
+        Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
+
+        Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
+        Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());
+
+        Assert.assertEquals(message.getTags(), messageByOffset.getTags());
+        Assert.assertEquals(message.getTags(), messageByMsgId.getTags());
+
+        Assert.assertEquals(message.isWaitStoreMsgOK(), 
messageByOffset.isWaitStoreMsgOK());
+        Assert.assertEquals(message.isWaitStoreMsgOK(), 
messageByMsgId.isWaitStoreMsgOK());
+
+        Assert.assertEquals(message.getBuyerId(), 
messageByOffset.getBuyerId());
+        Assert.assertEquals(message.getBuyerId(), messageByMsgId.getBuyerId());
+
+        Assert.assertEquals(message.getFlag(), messageByOffset.getFlag());
+        Assert.assertEquals(message.getFlag(), messageByMsgId.getFlag());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11653ce2/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
index 6fa8bd9..716ac51 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
@@ -61,7 +61,7 @@ public class MessageExceptionIT extends BaseConf {
 
     @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
     public void testSynSendNullMessage() throws Exception {
-        producer.send(null);
+        producer.send((Message) null);
     }
 
     @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)

Reply via email to