Repository: incubator-rocketmq Updated Branches: refs/heads/master fa85abcdf -> 0c5e53db6
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3bbe675..5be8258 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; @@ -57,6 +58,7 @@ public class CommitLog { private final FlushCommitLogService commitLogService; private final AppendMessageCallback appendMessageCallback; + private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal; private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024); private volatile long confirmOffset = -1L; @@ -81,6 +83,11 @@ public class CommitLog { this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() { + @Override protected MessageExtBatchEncoder initialValue() { + return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + } + }; } public boolean load() { @@ -222,7 +229,8 @@ public class CommitLog { * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ - public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { + public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, + final boolean readBody) { try { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); @@ -370,7 +378,7 @@ public class CommitLog { return new DispatchRequest(-1, false /* success */); } - private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { + private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC @@ -633,18 +641,23 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - GroupCommitRequest request = null; + handleDiskFlush(result, putMessageResult, msg); + handleHA(result, putMessageResult, msg); + return putMessageResult; + } + + public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; - if (msg.isWaitStoreMsgOK()) { - request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + if (messageExt.isWaitStoreMsgOK()) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { - log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() - + " client address: " + msg.getBornHostString()); + log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { @@ -659,26 +672,22 @@ public class CommitLog { commitLogService.wakeup(); } } + } - // Synchronous write double + public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); - if (msg.isWaitStoreMsgOK()) { + if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - if (null == request) { - request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - } + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); - service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = - // TODO request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { - log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " - + msg.getTags() + " client address: " + msg.getBornHostString()); + log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } @@ -690,12 +699,109 @@ public class CommitLog { } } + } + + public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + AppendMessageResult result; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + long eclipseTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + //fine-grained lock instead of the coarse-grained + MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + + lockForPutMessage(); //spin... + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create maped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); + } + + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create maped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); + case UNKNOWN_ERROR: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + default: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + } + + eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + releasePutMessageLock(); + } + + + if (eclipseTimeInLock > 500) { + log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, messageExtBatch.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); + + + handleDiskFlush(result, putMessageResult, messageExtBatch); + + handleHA(result, putMessageResult, messageExtBatch); + return putMessageResult; } /** - * According to receive certain message or offset storage time if an error - * occurs, it returns -1 + * According to receive certain message or offset storage time if an error occurs, it returns -1 */ public long pickupStoreTimestamp(final long offset, final int size) { if (offset >= this.getMinOffset()) { @@ -1098,6 +1204,8 @@ public class CommitLog { // Build Message Key private final StringBuilder keyBuilder = new StringBuilder(); + private final StringBuilder msgIdBuilder = new StringBuilder(); + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); DefaultAppendMessageCallback(final int size) { @@ -1110,7 +1218,8 @@ public class CommitLog { return msgStoreItemMemory; } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> // PHY OFFSET @@ -1218,7 +1327,7 @@ public class CommitLog { // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); - //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); + //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset @@ -1257,9 +1366,202 @@ public class CommitLog { return result; } + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBatch messageExtBatch) { + byteBuffer.mark(); + //physical offset + long wroteOffset = fileFromOffset + byteBuffer.position(); + // Record ConsumeQueue information + keyBuilder.setLength(0); + keyBuilder.append(messageExtBatch.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExtBatch.getQueueId()); + String key = keyBuilder.toString(); + Long queueOffset = CommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + CommitLog.this.topicQueueTable.put(key, queueOffset); + } + long beginQueueOffset = queueOffset; + int totalMsgLen = 0; + int msgNum = 0; + msgIdBuilder.setLength(0); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); + this.resetByteBuffer(hostHolder, 8); + ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder); + messagesByteBuff.mark(); + while (messagesByteBuff.hasRemaining()) { + // 1 TOTALSIZE + final int msgPos = messagesByteBuff.position(); + final int msgLen = messagesByteBuff.getInt(); + final int bodyLen = msgLen - 40; //only for log, just estimate it + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + + ", maxMessageSize: " + this.maxMessageSize); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + totalMsgLen += msgLen; + // Determines whether there is sufficient free space + if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { + this.resetByteBuffer(this.msgStoreItemMemory, 8); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(maxBlank); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); + // 3 The remaining space may be any value + // + //ignore previous read + messagesByteBuff.reset(); + // Here the length of the specially set maxBlank + byteBuffer.reset(); //ignore the previous appended messages + byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), + beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + } + //move to add queue offset and commitlog offset + messagesByteBuff.position(msgPos + 20); + messagesByteBuff.putLong(queueOffset); + messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen); + + storeHostBytes.rewind(); + String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + queueOffset++; + msgNum++; + messagesByteBuff.position(msgPos + msgLen); + } + + messagesByteBuff.position(0); + messagesByteBuff.limit(totalMsgLen); + byteBuffer.put(messagesByteBuff); + messageExtBatch.setEncodedBuff(null); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), + messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + result.setMsgNum(msgNum); + CommitLog.this.topicQueueTable.put(key, queueOffset); + + return result; + } + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { + byteBuffer.flip(); + byteBuffer.limit(limit); + } + + } + + public static class MessageExtBatchEncoder { + // Store the message content + private final ByteBuffer msgBatchMemory; + // The maximum length of the message + private final int maxMessageSize; + + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); + + MessageExtBatchEncoder(final int size) { + this.msgBatchMemory = ByteBuffer.allocateDirect(size); + this.maxMessageSize = size; + } + + + public ByteBuffer encode(final MessageExtBatch messageExtBatch) { + msgBatchMemory.clear(); //not thread-safe + int totalMsgLen = 0; + ByteBuffer messagesByteBuff = messageExtBatch.wrap(); + while (messagesByteBuff.hasRemaining()) { + // 1 TOTALSIZE + messagesByteBuff.getInt(); + // 2 MAGICCODE + messagesByteBuff.getInt(); + // 3 BODYCRC + messagesByteBuff.getInt(); + // 4 FLAG + int flag = messagesByteBuff.getInt(); + // 5 BODY + int bodyLen = messagesByteBuff.getInt(); + int bodyPos = messagesByteBuff.position(); + int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); + messagesByteBuff.position(bodyPos + bodyLen); + // 6 properties + short propertiesLen = messagesByteBuff.getShort(); + int propertiesPos = messagesByteBuff.position(); + messagesByteBuff.position(propertiesPos + propertiesLen); + + final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + + final int topicLength = topicData.length; + + final int msgLen = calMsgLength(bodyLen, topicLength, propertiesLen); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + + ", maxMessageSize: " + this.maxMessageSize); + throw new RuntimeException("message size exceeded"); + } + + totalMsgLen += msgLen; + // Determines whether there is sufficient free space + if (totalMsgLen > maxMessageSize) { + throw new RuntimeException("message size exceeded"); + } + + // 1 TOTALSIZE + this.msgBatchMemory.putInt(msgLen); + // 2 MAGICCODE + this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.msgBatchMemory.putInt(bodyCrc); + // 4 QUEUEID + this.msgBatchMemory.putInt(messageExtBatch.getQueueId()); + // 5 FLAG + this.msgBatchMemory.putInt(flag); + // 6 QUEUEOFFSET + this.msgBatchMemory.putLong(0); + // 7 PHYSICALOFFSET + this.msgBatchMemory.putLong(0); + // 8 SYSFLAG + this.msgBatchMemory.putInt(messageExtBatch.getSysFlag()); + // 9 BORNTIMESTAMP + this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp()); + // 10 BORNHOST + this.resetByteBuffer(hostHolder, 8); + this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(hostHolder)); + // 11 STORETIMESTAMP + this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + this.resetByteBuffer(hostHolder, 8); + this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(hostHolder)); + // 13 RECONSUMETIMES + this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes()); + // 14 Prepared Transaction Offset, batch does not support transaction + this.msgBatchMemory.putLong(0); + // 15 BODY + this.msgBatchMemory.putInt(bodyLen); + if (bodyLen > 0) + this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); + // 16 TOPIC + this.msgBatchMemory.put((byte) topicLength); + this.msgBatchMemory.put(topicData); + // 17 PROPERTIES + this.msgBatchMemory.putShort(propertiesLen); + if (propertiesLen > 0) + this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + } + msgBatchMemory.flip(); + return msgBatchMemory; + } + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index dc1c96c..eeb3598 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -331,7 +331,7 @@ public class ConsumeQueue { public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) { final int maxRetries = 30; - boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); if (result) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 2594ef3..5c2d27f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; @@ -325,6 +326,62 @@ public class DefaultMessageStore implements MessageStore { return result; } + public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { + if (this.shutdown) { + log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (!this.runningFlags.isWriteable()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } else { + this.printTimes.set(0); + } + + if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { + log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { + log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + if (this.isOSPageCacheBusy()) { + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + } + + long beginTime = this.getSystemClock().now(); + PutMessageResult result = this.commitLog.putMessages(messageExtBatch); + + long eclipseTime = this.getSystemClock().now() - beginTime; + if (eclipseTime > 500) { + log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); + } + this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); + + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + + return result; + } + @Override public boolean isOSPageCacheBusy() { long begin = this.getCommitLog().getBeginTimeInLock(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index feb505d..5cb72ce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.util.LibC; import org.slf4j.Logger; @@ -187,7 +189,15 @@ public class MappedFile extends ReferenceResource { } public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { - assert msg != null; + return appendMessagesInner(msg, cb); + } + + public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { + return appendMessagesInner(messageExtBatch, cb); + } + + public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { + assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); @@ -195,30 +205,28 @@ public class MappedFile extends ReferenceResource { if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); - AppendMessageResult result = - cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); + AppendMessageResult result = null; + if (messageExt instanceof MessageExtBrokerInner) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); + } else if (messageExt instanceof MessageExtBatch) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt); + } else { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } - - log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: " - + this.fileSize); + log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - /** - */ public long getFileFromOffset() { return this.fileFromOffset; } - /** - - * - */ public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 79e3a8f..65c546b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.store; import java.util.HashMap; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public interface MessageStore { @@ -33,6 +34,8 @@ public interface MessageStore { PutMessageResult putMessage(final MessageExtBrokerInner msg); + PutMessageResult putMessages(final MessageExtBatch messageExtBatch); + GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java index 4f610b8..3dcd861 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java @@ -27,6 +27,8 @@ public class RunningFlags { private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3; private static final int DISK_FULL_BIT = 1 << 4; + + private volatile int flagBits = 0; public RunningFlags() { @@ -76,6 +78,15 @@ public class RunningFlags { return false; } + //for consume queue, just ignore the DISK_FULL_BIT + public boolean isCQWriteable() { + if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) { + return true; + } + + return false; + } + public boolean getAndMakeNotWriteable() { boolean result = this.isWriteable(); if (result) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9cfd1c3..7ae2ab5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -132,6 +132,7 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -629,4 +630,5 @@ public class MessageStoreConfig { public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 29ec71e..1515eb4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -123,7 +123,9 @@ public class BrokerStatsManager { public void incTopicPutNums(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } - + public void incTopicPutNums(final String topic, int num, int times) { + this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times); + } public void incTopicPutSize(final String topic, final int size) { this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); } @@ -154,7 +156,9 @@ public class BrokerStatsManager { public void incBrokerPutNums() { this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); } - + public void incBrokerPutNums(final int incValue) { + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + } public void incBrokerGetNums(final int incValue) { this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java new file mode 100644 index 0000000..fc667b6 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class AppendCallbackTest { + + AppendMessageCallback callback; + + CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); + + @Before + public void init() throws Exception{ + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog"); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null); + CommitLog commitLog = new CommitLog(messageStore); + callback = commitLog.new DefaultAppendMessageCallback(1024); + } + + + @Test + public void testAppendMessageBatchEndOfFile() throws Exception{ + List<Message> messages = new ArrayList<>(); + String topic = "test-topic"; + int queue= 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + //encounter end of file when append half of the data + AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); + assertEquals(0, result.getWroteOffset()); + assertEquals(0, result.getLogicsOffset()); + assertEquals(1000, result.getWroteBytes()); + assertEquals(8, buff.position()); //write blank size and magic value + + assertTrue(result.getMsgId().length() > 0); //should have already constructed some message ids + } + @Test + public void testAppendMessageBatchSucc() throws Exception { + List<Message> messages = new ArrayList<>(); + String topic = "test-topic"; + int queue= 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + assertEquals(0, allresult.getWroteOffset()); + assertEquals(0, allresult.getLogicsOffset()); + assertEquals(buff.position(), allresult.getWroteBytes()); + + assertEquals(messages.size(), allresult.getMsgNum()); + + Set<String> msgIds = new HashSet<>(); + for (String msgId: allresult.getMsgId().split(",")) { + assertEquals(32, msgId.length()); + msgIds.add(msgId); + } + assertEquals(messages.size(), msgIds.size()); + + List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); + assertEquals(decodeMsgs.size(), decodeMsgs.size()); + long queueOffset = decodeMsgs.get(0).getQueueOffset(); + long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); + for (int i = 0; i < messages.size(); i++) { + assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); + assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); + assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); + + assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); + + assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); + assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); + assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java new file mode 100644 index 0000000..e372a1b --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.batch; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BatchSendIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private String topic = null; + private Random random = new Random(); + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testBatchSend_ViewMessage() throws Exception { + List<Message> messageList = new ArrayList<>(); + int batchNum = 100; + for (int i = 0; i < batchNum; i++) { + messageList.add(new Message(topic, RandomUtils.getStringByUUID().getBytes())); + } + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + SendResult sendResult = producer.send(messageList); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + + String[] offsetIds = sendResult.getOffsetMsgId().split(","); + String[] msgIds = sendResult.getMsgId().split(","); + Assert.assertEquals(messageList.size(), offsetIds.length); + Assert.assertEquals(messageList.size(), msgIds.length); + + Thread.sleep(2000); + + for (int i = 0; i < 3; i++) { + producer.viewMessage(offsetIds[random.nextInt(batchNum)]); + } + for (int i = 0; i < 3; i++) { + producer.viewMessage(topic, msgIds[random.nextInt(batchNum)]); + } + } + + + @Test + public void testBatchSend_CheckProperties() throws Exception { + List<Message> messageList = new ArrayList<>(); + Message message = new Message(); + message.setTopic(topic); + message.setKeys("keys123"); + message.setTags("tags123"); + message.setWaitStoreMsgOK(false); + message.setBuyerId("buyerid123"); + message.setFlag(123); + message.setBody("body".getBytes()); + messageList.add(message); + + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + SendResult sendResult = producer.send(messageList); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + + String[] offsetIds = sendResult.getOffsetMsgId().split(","); + String[] msgIds = sendResult.getMsgId().split(","); + Assert.assertEquals(messageList.size(), offsetIds.length); + Assert.assertEquals(messageList.size(), msgIds.length); + + Thread.sleep(2000); + + Message messageByOffset = producer.viewMessage(offsetIds[0]); + Message messageByMsgId = producer.viewMessage(topic, msgIds[0]); + + System.out.println(messageByOffset); + System.out.println(messageByMsgId); + + Assert.assertEquals(message.getTopic(), messageByMsgId.getTopic()); + Assert.assertEquals(message.getTopic(), messageByOffset.getTopic()); + + Assert.assertEquals(message.getKeys(), messageByOffset.getKeys()); + Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys()); + + Assert.assertEquals(message.getTags(), messageByOffset.getTags()); + Assert.assertEquals(message.getTags(), messageByMsgId.getTags()); + + Assert.assertEquals(message.isWaitStoreMsgOK(), messageByOffset.isWaitStoreMsgOK()); + Assert.assertEquals(message.isWaitStoreMsgOK(), messageByMsgId.isWaitStoreMsgOK()); + + Assert.assertEquals(message.getBuyerId(), messageByOffset.getBuyerId()); + Assert.assertEquals(message.getBuyerId(), messageByMsgId.getBuyerId()); + + Assert.assertEquals(message.getFlag(), messageByOffset.getFlag()); + Assert.assertEquals(message.getFlag(), messageByMsgId.getFlag()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java index 6fa8bd9..716ac51 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java @@ -61,7 +61,7 @@ public class MessageExceptionIT extends BaseConf { @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) public void testSynSendNullMessage() throws Exception { - producer.send(null); + producer.send((Message) null); } @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
