ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/47fad3c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/47fad3c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/47fad3c1 Branch: refs/heads/master Commit: 47fad3c17ab2d161743d4e52efc6258b7bcafde9 Parents: 0d6c56b Author: dongeforever <[email protected]> Authored: Fri Mar 17 18:59:43 2017 +0800 Committer: dongeforever <[email protected]> Committed: Tue Jun 6 11:37:29 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerController.java | 2 + .../processor/AbstractSendMessageProcessor.java | 12 +- .../broker/processor/SendMessageProcessor.java | 350 ++++++++++++------- .../org/apache/rocketmq/client/Validators.java | 1 + .../rocketmq/client/impl/MQClientAPIImpl.java | 56 +-- .../impl/producer/DefaultMQProducerImpl.java | 30 +- .../client/producer/DefaultMQProducer.java | 38 ++ .../rocketmq/client/producer/MQProducer.java | 14 + .../apache/rocketmq/common/TopicFilterType.java | 1 + .../rocketmq/common/message/MessageBatch.java | 73 ++++ .../rocketmq/common/message/MessageDecoder.java | 103 ++++++ .../rocketmq/common/message/MessageExt.java | 2 +- .../common/message/MessageExtBatch.java | 42 +++ .../rocketmq/common/protocol/RequestCode.java | 3 + .../header/SendMessageRequestHeader.java | 10 + .../header/SendMessageRequestHeaderV2.java | 14 + .../rocketmq/common/MessageBatchTest.java | 70 ++++ .../common/MessageEncodeDecodeTest.java | 81 +++++ .../rocketmq/store/AppendMessageCallback.java | 15 +- .../rocketmq/store/AppendMessageResult.java | 11 + .../org/apache/rocketmq/store/CommitLog.java | 344 ++++++++++++++++-- .../org/apache/rocketmq/store/ConsumeQueue.java | 2 +- .../rocketmq/store/DefaultMessageStore.java | 57 +++ .../org/apache/rocketmq/store/MappedFile.java | 32 +- .../org/apache/rocketmq/store/MessageStore.java | 3 + .../org/apache/rocketmq/store/RunningFlags.java | 11 + .../store/config/MessageStoreConfig.java | 2 + .../store/stats/BrokerStatsManager.java | 8 +- .../rocketmq/store/AppendCallbackTest.java | 150 ++++++++ .../test/client/producer/batch/BatchSendIT.java | 131 +++++++ .../exception/msg/MessageExceptionIT.java | 2 +- 31 files changed, 1464 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index b656870..7e9e7ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -374,9 +374,11 @@ public class BrokerController { this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 9f23bad..3faa7ae 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -17,11 +17,6 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; + public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { + case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = (SendMessageRequestHeaderV2) request http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index a440462..56a0b99 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -72,7 +73,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + + RemotingCommand response; + if (requestHeader.isBatch()) { + response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + } else { + response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + } this.executeSendMessageHookAfter(response, mqtraceContext); return response; @@ -238,6 +245,50 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } + + private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, + MessageExt msg, TopicConfig topicConfig) { + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return false; + } + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + msg.setTopic(newTopic); + msg.setQueueId(queueIdInt); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return false; + } + } + } + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + msg.setSysFlag(sysFlag); + return true; + } + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand request, // final SendMessageContext sendMessageContext, // @@ -251,9 +302,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - if (log.isDebugEnabled()) { - log.debug("receive SendMessage request command, {}", request); - } + log.debug("receive SendMessage request command, {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { @@ -270,6 +319,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement final byte[] body = request.getBody(); + + int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -277,53 +328,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } - int sysFlag = requestHeader.getSysFlag(); + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setQueueId(queueIdInt); - if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { - sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { + return response; } - String newTopic = requestHeader.getTopic(); - if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); - if (null == subscriptionGroupConfig) { - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark( - "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; - } - - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); - if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { - maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); - } - int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); - if (reconsumeTimes >= maxReconsumeTimes) { - newTopic = MixAll.getDLQTopic(groupName); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 - ); - if (null == topicConfig) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("topic[" + newTopic + "] not exist"); - return response; - } - } - } - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(newTopic); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); - - msgInner.setQueueId(queueIdInt); - msgInner.setSysFlag(sysFlag); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); @@ -340,105 +356,183 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - boolean sendOK = false; - switch (putMessageResult.getPutMessageStatus()) { - // Success - case PUT_OK: - sendOK = true; - response.setCode(ResponseCode.SUCCESS); - break; - case FLUSH_DISK_TIMEOUT: - response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); - sendOK = true; - break; - case FLUSH_SLAVE_TIMEOUT: - response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); - sendOK = true; - break; - case SLAVE_NOT_AVAILABLE: - response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); - sendOK = true; - break; + return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); - // Failed - case CREATE_MAPEDFILE_FAILED: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("create mapped file failed, server is busy or broken."); - break; - case MESSAGE_ILLEGAL: - case PROPERTIES_SIZE_EXCEEDED: - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); - break; - case SERVICE_NOT_AVAILABLE: - response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); - response.setRemark( - "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); - break; - case OS_PAGECACHE_BUSY: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); - break; - case UNKNOWN_ERROR: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR"); - break; - default: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR DEFAULT"); - break; - } + } - String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); - if (sendOK) { - this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); - this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(); + private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, + SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { + if (putMessageResult == null) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + return response; + } + boolean sendOK = false; + + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + sendOK = true; + response.setCode(ResponseCode.SUCCESS); + break; + case FLUSH_DISK_TIMEOUT: + response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); + sendOK = true; + break; + case FLUSH_SLAVE_TIMEOUT: + response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); + sendOK = true; + break; + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + sendOK = true; + break; + + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create mapped file failed, server is busy or broken."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark( + "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + if (sendOK) { - response.setRemark(null); + this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); - responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); - responseHeader.setQueueId(queueIdInt); - responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + response.setRemark(null); - doResponse(ctx, request, response); + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - if (hasSendMessageHook()) { - sendMessageContext.setMsgId(responseHeader.getMsgId()); - sendMessageContext.setQueueId(responseHeader.getQueueId()); - sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + doResponse(ctx, request, response); - int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); - int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + if (hasSendMessageHook()) { + sendMessageContext.setMsgId(responseHeader.getMsgId()); + sendMessageContext.setQueueId(responseHeader.getQueueId()); + sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } - return null; - } else { - if (hasSendMessageHook()) { - int wroteSize = request.getBody().length; - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); - - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); } + return null; } else { + if (hasSendMessageHook()) { + int wroteSize = request.getBody().length; + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } + return response; + } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + log.debug("Receive SendMessage request command {}", request); + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("store putMessage return null"); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + requestHeader.getTopic().length()); + return response; } + if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); + return response; + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(requestHeader.getTopic()); + messageExtBatch.setQueueId(queueIdInt); + + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + messageExtBatch.setSysFlag(sysFlag); + + messageExtBatch.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + messageExtBatch.setBody(request.getBody()); + messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); + messageExtBatch.setBornHost(ctx.channel().remoteAddress()); + messageExtBatch.setStoreHost(this.getStoreHost()); + messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); + + handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); return response; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 899efa6..b49537f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -95,6 +95,7 @@ public class Validators { } // topic Validators.checkTopic(msg.getTopic()); + // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 12580c1..bdce883 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.Iterator; +import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; @@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; + public class MQClientAPIImpl { private final static Logger log = ClientLogger.getLog(); @@ -278,14 +280,14 @@ public class MQClientAPIImpl { } public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendMessageContext context, // 7 + final DefaultMQProducerImpl producer // 8 ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } @@ -305,9 +307,9 @@ public class MQClientAPIImpl { final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; - if (sendSmartMsg) { + if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } @@ -334,11 +336,11 @@ public class MQClientAPIImpl { } private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -507,8 +509,16 @@ public class MQClientAPIImpl { MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); + String uniqMsgId = MessageClientIDSetter.getUniqID(msg); + if (msg instanceof MessageBatch) { + StringBuilder sb = new StringBuilder(); + for (Message message : (MessageBatch) msg) { + sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); + } + uniqMsgId = sb.toString(); + } SendResult sendResult = new SendResult(sendStatus, - MessageClientIDSetter.getUniqID(msg), + uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); @@ -1452,7 +1462,7 @@ public class MQClientAPIImpl { } public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, - final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8e81979..d828875 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageId; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; @@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { byte[] prevBody = msg.getBody(); try { - - MessageClientIDSetter.setUniqID(msg); + //for MessageBatch,ID has been set in the generating process + if (!(msg instanceof MessageBatch)) { + MessageClientIDSetter.setUniqID(msg); + } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { @@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); + requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private boolean tryToCompressMessage(final Message msg) { + if (msg instanceof MessageBatch) { + //batch dose not support compressing right now + return false; + } byte[] body = msg.getBody(); if (body != null) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 3480c92..135a447 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -16,14 +16,18 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; @@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); } + @Override + public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs)); + } + + @Override + public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), timeout); + } + + @Override + public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), messageQueue); + } + + @Override + public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); + } + + private MessageBatch batch(Collection<Message> msgs) throws MQClientException { + MessageBatch msgBatch; + try { + msgBatch = MessageBatch.generateFromList(msgs); + for (Message message : msgBatch) { + Validators.checkMessage(message, this); + MessageClientIDSetter.setUniqID(message); + } + msgBatch.setBody(msgBatch.encode()); + } catch (Exception e) { + throw new MQClientException("Failed to initiate the MessageBatch", e); + } + return msgBatch; + } public String getProducerGroup() { return producerGroup; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 9fc7586..14caf6f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin { TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; + + //for batch + SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java index 58459e0..8dde5d8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java @@ -19,4 +19,5 @@ package org.apache.rocketmq.common; public enum TopicFilterType { SINGLE_TAG, MULTI_TAG + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java new file mode 100644 index 0000000..ca2ce88 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.rocketmq.common.MixAll; + +public class MessageBatch extends Message implements Iterable<Message> { + + private static final long serialVersionUID = 621335151046335557L; + private final List<Message> messages; + + private MessageBatch(List<Message> messages) { + this.messages = messages; + } + + public byte[] encode() { + return MessageDecoder.encodeMessages(messages); + } + + public Iterator<Message> iterator() { + return messages.iterator(); + } + + public static MessageBatch generateFromList(Collection<Message> messages) { + assert messages != null; + assert messages.size() > 0; + List<Message> messageList = new ArrayList<Message>(messages.size()); + Message first = null; + for (Message message : messages) { + if (message.getDelayTimeLevel() > 0) { + throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching"); + } + if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + throw new UnsupportedOperationException("Retry Group is not supported for batching"); + } + if (first == null) { + first = message; + } else { + if (!first.getTopic().equals(message.getTopic())) { + throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); + } + if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { + throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); + } + } + messageList.add(message); + } + MessageBatch messageBatch = new MessageBatch(messageList); + + messageBatch.setTopic(first.getTopic()); + messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK()); + return messageBatch; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 4f4e158..90b837a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -200,6 +200,8 @@ public class MessageDecoder { return byteBuffer.array(); } + + public static MessageExt decode( java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { return decode(byteBuffer, readBody, deCompressBody, false); @@ -372,4 +374,105 @@ public class MessageDecoder { return map; } + + + public static byte[] encodeMessage(Message message) { + //only need flag, body, properties + byte[] body = message.getBody(); + int bodyLen = body.length; + String properties = messageProperties2String(message.getProperties()); + byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); + //note properties length must not more than Short.MAX + short propertiesLength = (short) propertiesBytes.length; + int sysFlag = message.getFlag(); + int storeSize = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCOD + + 4 // 3 BODYCRC + + 4 // 4 FLAG + + 4 + bodyLen // 4 BODY + + 2 + propertiesLength; + ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize); + // 1 TOTALSIZE + byteBuffer.putInt(storeSize); + + // 2 MAGICCODE + byteBuffer.putInt(0); + + // 3 BODYCRC + byteBuffer.putInt(0); + + // 4 FLAG + int flag = message.getFlag(); + byteBuffer.putInt(flag); + + // 5 BODY + byteBuffer.putInt(bodyLen); + byteBuffer.put(body); + + // 6 properties + byteBuffer.putShort(propertiesLength); + byteBuffer.put(propertiesBytes); + + return byteBuffer.array(); + } + + public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception { + Message message = new Message(); + + // 1 TOTALSIZE + byteBuffer.getInt(); + + // 2 MAGICCODE + byteBuffer.getInt(); + + // 3 BODYCRC + byteBuffer.getInt(); + + // 4 FLAG + int flag = byteBuffer.getInt(); + message.setFlag(flag); + + // 5 BODY + int bodyLen = byteBuffer.getInt(); + byte[] body = new byte[bodyLen]; + byteBuffer.get(body); + message.setBody(body); + + // 6 properties + short propertiesLen = byteBuffer.getShort(); + byte[] propertiesBytes = new byte[propertiesLen]; + byteBuffer.get(propertiesBytes); + message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8))); + + return message; + } + + public static byte[] encodeMessages(List<Message> messages) { + //TO DO refactor, accumulate in one buffer, avoid copies + List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size()); + int allSize = 0; + for (Message message: messages) { + byte[] tmp = encodeMessage(message); + encodedMessages.add(tmp); + allSize += tmp.length; + } + byte[] allBytes = new byte[allSize]; + int pos = 0; + for (byte[] bytes : encodedMessages) { + System.arraycopy(bytes, 0, allBytes, pos, bytes.length); + pos += bytes.length; + } + return allBytes; + } + + + public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception { + //TO DO add a callback for processing, avoid creating lists + List<Message> msgs = new ArrayList<Message>(); + while (byteBuffer.hasRemaining()) { + Message msg = decodeMessage(byteBuffer); + msgs.add(msg); + } + return msgs; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java index d11069f..3f77767 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java @@ -64,7 +64,7 @@ public class MessageExt extends Message { return TopicFilterType.SINGLE_TAG; } - private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); byteBuffer.putInt(inetSocketAddress.getPort()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java new file mode 100644 index 0000000..352ab37 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +import java.nio.ByteBuffer; + +public class MessageExtBatch extends MessageExt { + + private static final long serialVersionUID = -2353110995348498537L; + + + public ByteBuffer wrap() { + assert getBody() != null; + return ByteBuffer.wrap(getBody(), 0, getBody().length); + } + + + private ByteBuffer encodedBuff; + + public ByteBuffer getEncodedBuff() { + return encodedBuff; + } + + public void setEncodedBuff(ByteBuffer encodedBuff) { + this.encodedBuff = encodedBuff; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 217e8df..c6b0925 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -159,4 +159,7 @@ public class RequestCode { * get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; + + + public static final int SEND_BATCH_MESSAGE = 320; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 38b6589..2df31e6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader { private Integer reconsumeTimes; @CFNullable private boolean unitMode = false; + @CFNullable + private boolean batch = false; private Integer maxReconsumeTimes; @Override @@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader { public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } + + public boolean isBatch() { + return batch; + } + + public void setBatch(boolean batch) { + this.batch = batch; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index 34c83cb..757ef0c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { private Integer l; // consumeRetryTimes + @CFNullable + private boolean m; //batch + + public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); v1.setProducerGroup(v2.a); @@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { v1.setReconsumeTimes(v2.j); v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); + v1.setBatch(v2.m); return v1; } @@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { v2.j = v1.getReconsumeTimes(); v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); + v2.m = v1.isBatch(); return v2; } @@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { public void setL(final Integer l) { this.l = l; } + + public boolean isM() { + return m; + } + + public void setM(boolean m) { + this.m = m; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java new file mode 100644 index 0000000..1e406d2 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.junit.Test; + +public class MessageBatchTest { + + + public List<Message> generateMessages() { + List<Message> messages = new ArrayList<Message>(); + Message message1 = new Message("topic1", "body".getBytes()); + Message message2 = new Message("topic1", "body".getBytes()); + + messages.add(message1); + messages.add(message2); + return messages; + } + + @Test + public void testGenerate_OK() throws Exception{ + List<Message> messages = generateMessages(); + MessageBatch.generateFromList(messages); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_DiffTopic() throws Exception{ + List<Message> messages = generateMessages(); + messages.get(1).setTopic("topic2"); + MessageBatch.generateFromList(messages); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_DiffWaitOK() throws Exception{ + List<Message> messages = generateMessages(); + messages.get(1).setWaitStoreMsgOK(false); + MessageBatch.generateFromList(messages); + } + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_Delay() throws Exception{ + List<Message> messages = generateMessages(); + messages.get(1).setDelayTimeLevel(1); + MessageBatch.generateFromList(messages); + } + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_Retry() throws Exception{ + List<Message> messages = generateMessages(); + messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic"); + MessageBatch.generateFromList(messages); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java new file mode 100644 index 0000000..a219eda --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * Created by liuzhendong on 16/12/21. + */ +public class MessageEncodeDecodeTest { + + + @Test + public void testEncodeDecodeSingle() throws Exception{ + Message message = new Message("topic", "body".getBytes()); + message.setFlag(12); + message.putUserProperty("key","value"); + byte[] bytes = MessageDecoder.encodeMessage(message); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.put(bytes); + buffer.flip(); + Message newMessage = MessageDecoder.decodeMessage(buffer); + + assertTrue(message.getFlag() == newMessage.getFlag()); + assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key"))); + assertTrue(Arrays.equals(newMessage.getBody(), message.getBody())); + } + + @Test + public void testEncodeDecodeList() throws Exception { + List<Message> messages = new ArrayList<Message>(128); + for (int i = 0; i < 100; i++) { + Message message = new Message("topic", ("body" + i).getBytes()); + message.setFlag(i); + message.putUserProperty("key", "value" + i); + messages.add(message); + } + byte[] bytes = MessageDecoder.encodeMessages(messages); + + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.put(bytes); + buffer.flip(); + + List<Message> newMsgs = MessageDecoder.decodeMessages(buffer); + + assertTrue(newMsgs.size() == messages.size()); + + for (int i = 0; i < newMsgs.size(); i++) { + Message message = messages.get(i); + Message newMessage = newMsgs.get(i); + assertTrue(message.getFlag() == newMessage.getFlag()); + assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key"))); + assertTrue(Arrays.equals(newMessage.getBody(), message.getBody())); + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index 70b702e..16a62fa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store; import java.nio.ByteBuffer; +import org.apache.rocketmq.common.message.MessageExtBatch; /** * Write messages callback interface @@ -32,5 +33,17 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg); + + /** + * After batched message serialization, write MapedByteBuffer + * + * @param byteBuffer + * @param maxBlank + * @param messageExtBatch, backed up by a byte array + * + * @return How many bytes to write + */ + AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, + final int maxBlank, final MessageExtBatch messageExtBatch); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index 5182dc4..d6d1aa6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -34,6 +34,8 @@ public class AppendMessageResult { private long logicsOffset; private long pagecacheRT = 0; + private int msgNum = 1; + public AppendMessageResult(AppendMessageStatus status) { this(status, 0, 0, "", 0, 0, 0); } @@ -109,6 +111,14 @@ public class AppendMessageResult { this.logicsOffset = logicsOffset; } + public int getMsgNum() { + return msgNum; + } + + public void setMsgNum(int msgNum) { + this.msgNum = msgNum; + } + @Override public String toString() { return "AppendMessageResult{" + @@ -119,6 +129,7 @@ public class AppendMessageResult { ", storeTimestamp=" + storeTimestamp + ", logicsOffset=" + logicsOffset + ", pagecacheRT=" + pagecacheRT + + ", msgNum=" + msgNum + '}'; } }
