iamzhoug37 closed pull request #280: [ISSUE #249] master-slave sync model performance improve URL: https://github.com/apache/rocketmq/pull/280
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index f6f8a80af..893f7fe62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -81,8 +81,8 @@ public void destroy() { } @Override - public PutMessageResult putMessage(MessageExtBrokerInner msg) { - return next.putMessage(msg); + public void putMessage(MessageExtBrokerInner msg , PutMessageCallback putMessageCallback) { + next.putMessage(msg , putMessageCallback); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d69a78700..543a450af 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -106,6 +106,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.store.ConsumeQueue; @@ -207,6 +208,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; @@ -1356,4 +1363,4 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, return response; } -} +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index 67807a863..15b6f47a6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index bb427050d..e0fcdb7bd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index fee1420a9..9a8034352 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -30,9 +30,11 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.PutMessageResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +146,15 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, } final MessageStore messageStore = this.brokerController.getMessageStore(); - final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + PutMessageCallback putMessageCallback = new PutMessageCallback() ; + messageStore.putMessage(msgInner , putMessageCallback); + try { + putMessageCallback.waitComplete(); + } + catch (InterruptedException e) { + //ignore + } + final PutMessageResult putMessageResult = putMessageCallback.getPutMessageResult() ; if (putMessageResult != null) { switch (putMessageResult.getPutMessageStatus()) { // Success @@ -197,6 +207,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return response; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java index 199aa940d..f58ad0c18 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index a46cbff2e..a709f2d01 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -56,11 +56,12 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; -import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; @@ -81,6 +82,12 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, return this.processRequest(ctx.channel(), request, true); } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; @@ -524,7 +531,7 @@ private void generateOffsetMovedEvent(final OffsetMovedEvent event) { msgInner.setReconsumeTimes(0); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + this.brokerController.getMessageStore().putMessage(msgInner , new PutMessageCallback()); } catch (Exception e) { log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index e8f97d0af..ae204be28 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java new file mode 100644 index 000000000..787e28221 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +/** + * SendMessageCallback is a asynchronous callback for processor process the response of the produce request + */ +public interface SendMessageCallback { + void callback(RemotingCommand response); +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 227a23e6b..cb963871a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -45,8 +45,10 @@ import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -60,31 +62,43 @@ public SendMessageProcessor(final BrokerController brokerController) { } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - SendMessageContext mqtraceContext; + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, final RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + + final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: - return this.consumerSendMsgBack(ctx, request); + this.consumerSendMsgBack(ctx, request , remoteCommandResponseCallback); + return ; default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { - return null; + remoteCommandResponseCallback.callback(null); + return; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - RemotingCommand response; + SendMessageCallback sendMessageCallback = new SendMessageCallback() { + @Override + public void callback(RemotingCommand response) { + executeSendMessageHookAfter(response, mqtraceContext); + remoteCommandResponseCallback.callback(response); + } + } ; if (requestHeader.isBatch()) { - response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader , sendMessageCallback); } else { - response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + this.sendMessage(ctx, request, mqtraceContext, requestHeader , sendMessageCallback); } - - this.executeSendMessageHookAfter(response, mqtraceContext); - return response; } + + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + throw new RemotingCommandException("sendMessageProcessor not support processRequest" , new UnsupportedOperationException()) ; } @Override @@ -93,7 +107,7 @@ public boolean rejectRequest() { this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } - private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) + private void consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request , final RemoteCommandResponseCallback remoteCommandResponseCallback) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = @@ -117,19 +131,22 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; + remoteCommandResponseCallback.callback(response); + return ; } if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); - return response; + remoteCommandResponseCallback.callback(response); + return ; } if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); - return response; + remoteCommandResponseCallback.callback(response); + return ; } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); @@ -147,20 +164,23 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + remoteCommandResponseCallback.callback(response); + return ; } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); - return response; + remoteCommandResponseCallback.callback(response); + return ; } - MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); + final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); - return response; + remoteCommandResponseCallback.callback(response); + return ; } final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); @@ -188,7 +208,8 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + remoteCommandResponseCallback.callback(response); + return ; } } else { if (0 == delayLevel) { @@ -216,34 +237,40 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - switch (putMessageResult.getPutMessageStatus()) { - case PUT_OK: - String backTopic = msgExt.getTopic(); - String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (correctTopic != null) { - backTopic = correctTopic; + this.brokerController.getMessageStore().putMessage(msgInner, new PutMessageCallback() { + @Override + public void doAction(PutMessageResult putMessageResult) { + if (putMessageResult != null) { + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + String backTopic = msgExt.getTopic(); + String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (correctTopic != null) { + backTopic = correctTopic; + } + + brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + remoteCommandResponseCallback.callback(response); + return ; + default: + break; } - this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(putMessageResult.getPutMessageStatus().name()); + remoteCommandResponseCallback.callback(response); + return ; + } - return response; - default: - break; + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("putMessageResult is null"); + remoteCommandResponseCallback.callback(response); } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(putMessageResult.getPutMessageStatus().name()); - return response; - } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("putMessageResult is null"); - return response; + }); } private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, @@ -290,10 +317,11 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti return true; } - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, + private void sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + final SendMessageRequestHeader requestHeader, + final SendMessageCallback sendMessageCallback) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); @@ -309,13 +337,15 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; + sendMessageCallback.callback(response); + return ; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { - return response; + sendMessageCallback.callback(response); + return ; } final byte[] body = request.getBody(); @@ -327,12 +357,13 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + final MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { - return response; + sendMessageCallback.callback(response); + return ; } msgInner.setBody(body); @@ -350,14 +381,20 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); - return response; + sendMessageCallback.callback(response); + return ; } } - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - - return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); - + final int innerQueueIdInt = queueIdInt ; + PutMessageCallback putMessageCallback = new PutMessageCallback() { + @Override + public void doAction(PutMessageResult putMessageResult) { + RemotingCommand remotingCommand = handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, innerQueueIdInt); + sendMessageCallback.callback(remotingCommand); + } + } ; + this.brokerController.getMessageStore().putMessage(msgInner , putMessageCallback); } private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, @@ -465,10 +502,11 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, + private void sendBatchMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + final SendMessageRequestHeader requestHeader , + final SendMessageCallback sendMessageCallback) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); @@ -484,13 +522,15 @@ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; + sendMessageCallback.callback(response); + return ; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { - return response; + sendMessageCallback.callback(response); + return ; } int queueIdInt = requestHeader.getQueueId(); @@ -503,15 +543,17 @@ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("message topic length too long " + requestHeader.getTopic().length()); - return response; + sendMessageCallback.callback(response); + return ; } if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); - return response; + sendMessageCallback.callback(response); + return ; } - MessageExtBatch messageExtBatch = new MessageExtBatch(); + final MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); @@ -529,9 +571,15 @@ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); - - return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); + final int innerQueueIdInt = queueIdInt ; + PutMessageCallback putMessageCallback = new PutMessageCallback() { + @Override + public void doAction(PutMessageResult putMessageResult) { + RemotingCommand remotingCommand = handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, innerQueueIdInt); + sendMessageCallback.callback(remotingCommand); + } + } ; + this.brokerController.getMessageStore().putMessages(messageExtBatch , putMessageCallback); } public boolean hasConsumeMessageHook() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index e544d90a1..9e993f9e4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -184,7 +185,10 @@ public void dispatch(DispatchRequest request) { msg.putUserProperty("a", String.valueOf(j * 10 + 5)); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); - PutMessageResult result = master.putMessage(msg); + PutMessageCallback putMessageCallback = new PutMessageCallback() ; + master.putMessage(msg , putMessageCallback); + putMessageCallback.waitComplete(); + PutMessageResult result = putMessageCallback.getPutMessageResult() ; msg.setMsgId(result.getAppendMessageResult().getMsgId()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 7828e7a91..e522d7a3d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -84,13 +85,13 @@ public void init() { @Test public void testProcessRequest() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); assertPutResult(ResponseCode.SUCCESS); } @Test public void testProcessRequest_WithHook() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); List<SendMessageHook> sendMessageHookList = new ArrayList<>(); final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; SendMessageHook sendMessageHook = new SendMessageHook() { @@ -120,55 +121,55 @@ public void sendMessageAfter(SendMessageContext context) { @Test public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); } @Test public void testProcessRequest_MessageIllegal() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); } @Test public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); } @Test public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); } @Test public void testProcessRequest_WithMsgBack() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(putMessage()).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); sendMessageProcessor = new SendMessageProcessor(brokerController); @@ -177,6 +178,17 @@ public void testProcessRequest_WithMsgBack() throws RemotingCommandException { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + private PutMessageResult putMessage(){ + PutMessageCallback putMessageCallback = new PutMessageCallback(); + messageStore.putMessage(any(MessageExtBrokerInner.class) , putMessageCallback) ; + try{ + putMessageCallback.waitComplete(); + } + catch (InterruptedException e){ + e.printStackTrace(); + } + return putMessageCallback.getPutMessageResult() ; + } private RemotingCommand createSendMsgCommand(int requestCode) { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(group); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 69478cf32..f168c7401 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; @@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java index e459b1aeb..0d9701c79 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.CommitLog; import org.slf4j.Logger; @@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java index f6611b683..2dd363c79 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; @@ -83,4 +84,10 @@ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } + + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index ed5b20b16..8ec562a64 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,6 +121,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return null; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + RemotingCommand remotingCommand = processRequest(ctx , request) ; + remoteCommandResponseCallback.callback(remotingCommand); + } + @Override public boolean rejectRequest() { return false; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 76752529a..7f6253731 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.slf4j.Logger; @@ -170,31 +171,36 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin @Override public void run() { try { - RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); + final RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } - final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); - if (rpcHook != null) { - rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); - } - - if (!cmd.isOnewayRPC()) { - if (response != null) { - response.setOpaque(opaque); - response.markResponseType(); - try { - ctx.writeAndFlush(response); - } catch (Throwable e) { - log.error("process request over, but response failed", e); - log.error(cmd.toString()); - log.error(response.toString()); + final RemoteCommandResponseCallback responseCallback = new RemoteCommandResponseCallback() { + @Override + public void callback(RemotingCommand response) { + if (rpcHook != null) { + rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } - } else { + if (!cmd.isOnewayRPC()) { + if (response != null) { + response.setOpaque(opaque); + response.markResponseType(); + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(response.toString()); + } + } else { + + } + } } - } + } ; + pair.getObject1().asyncProcessRequest(ctx, cmd , responseCallback); } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 040f76848..a6f05e93f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** @@ -26,5 +27,8 @@ RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; + void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request , RemoteCommandResponseCallback remoteCommandResponseCallback) + throws Exception; + boolean rejectRequest(); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java new file mode 100644 index 000000000..c13d64ca0 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol; + +/** + * RemoteCommandResponseCallback is a asynchronous callback for netty network io + */ +public interface RemoteCommandResponseCallback { + void callback(RemotingCommand response); +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java index 0ecfaaa5a..461496691 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -55,6 +56,11 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return request; } + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws Exception { + throw new UnsupportedOperationException() ; + } + @Override public boolean rejectRequest() { return false; diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 03d98d319..0a443a417 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -62,6 +62,7 @@ private volatile long beginTimeInLock = 0; private final PutMessageLock putMessageLock; + private final GroupCommitCallback groupCommitCallback ; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), @@ -84,6 +85,7 @@ protected MessageExtBatchEncoder initialValue() { } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + this.groupCommitCallback = new GroupCommitCallback() ; } @@ -518,7 +520,7 @@ public long getBeginTimeInLock() { return beginTimeInLock; } - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { + public void putMessage(final MessageExtBrokerInner msg , final CommitLogPutMessageCallback commitLogPutMessageCallback) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting @@ -573,7 +575,8 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + return ; } result = mappedFile.appendMessage(msg, this.appendMessageCallback); @@ -588,20 +591,24 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + return ; } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + return ; case UNKNOWN_ERROR: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + return ; default: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + return ; } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; @@ -625,9 +632,9 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); handleDiskFlush(result, putMessageResult, msg); - handleHA(result, putMessageResult, msg); - - return putMessageResult; + if (!handleHA(result, putMessageResult, msg, commitLogPutMessageCallback)) { + commitLogPutMessageCallback.callback(putMessageResult); + } } public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { @@ -657,22 +664,25 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess } } - public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { + /** + * + * @param result + * @param putMessageResult + * @param messageExt + * @param commitLogPutMessageCallback + * @return whether to wait slave, if false, the request should respond synchronously + */ + public boolean handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt, CommitLogPutMessageCallback commitLogPutMessageCallback) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes() , commitLogPutMessageCallback, putMessageResult, messageExt, groupCommitCallback) ; service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = - request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); - if (!flushOK) { - log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " - + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); - } + return true; } // Slave problem else { @@ -682,9 +692,10 @@ public void handleHA(AppendMessageResult result, PutMessageResult putMessageResu } } + return false; } - public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { + public void putMessages(final MessageExtBatch messageExtBatch , CommitLogPutMessageCallback commitLogPutMessageCallback) { messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); AppendMessageResult result; @@ -693,10 +704,12 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + return ; } if (messageExtBatch.getDelayTimeLevel() > 0) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + return ; } long eclipseTimeInLock = 0; @@ -723,7 +736,8 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { if (null == mappedFile) { log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + return ; } result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); @@ -738,20 +752,24 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { // XXX: warn and notify me log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + return ; } result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + return ; case UNKNOWN_ERROR: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + return ; default: beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + commitLogPutMessageCallback.callback(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + return ; } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; @@ -776,9 +794,9 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { handleDiskFlush(result, putMessageResult, messageExtBatch); - handleHA(result, putMessageResult, messageExtBatch); - - return putMessageResult; + if (!handleHA(result, putMessageResult, messageExtBatch, commitLogPutMessageCallback)) { + commitLogPutMessageCallback.callback(putMessageResult); + } } /** @@ -1027,10 +1045,21 @@ public long getJointime() { private final long nextOffset; private final CountDownLatch countDownLatch = new CountDownLatch(1); private volatile boolean flushOK = false; + private CommitLogPutMessageCallback commitLogPutMessageCallback ; + private PutMessageResult putMessageResult ; + private MessageExt messageExt ; + private GroupCommitCallback groupCommitCallback ; public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } + public GroupCommitRequest(long nextOffset, CommitLogPutMessageCallback commitLogPutMessageCallback, PutMessageResult putMessageResult, MessageExt messageExt, GroupCommitCallback groupCommitCallback) { + this.nextOffset = nextOffset; + this.commitLogPutMessageCallback = commitLogPutMessageCallback; + this.putMessageResult = putMessageResult; + this.messageExt = messageExt; + this.groupCommitCallback = groupCommitCallback ; + } public long getNextOffset() { return nextOffset; @@ -1050,6 +1079,42 @@ public boolean waitForFlush(long timeout) { return false; } } + + public void setFlushOK(boolean flushOK) { + this.flushOK = flushOK; + } + + public boolean isFlushOK() { + return flushOK; + } + + public CommitLogPutMessageCallback getCommitLogPutMessageCallback() { + return commitLogPutMessageCallback; + } + + public PutMessageResult getPutMessageResult() { + return putMessageResult; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public GroupCommitCallback getGroupCommitCallback() { + return groupCommitCallback; + } + } + + public static class GroupCommitCallback { + public void doSlaveAction(GroupCommitRequest request) { + boolean flushOK = request.isFlushOK() ; + if (!flushOK) { + log.error("do sync transfer other node, wait return, but failed, topic: " + request.getMessageExt().getTopic() + " tags: " + + request.getMessageExt().getTags() + " client address: " + request.getMessageExt().getBornHostNameString()); + request.getPutMessageResult().setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + } + request.getCommitLogPutMessageCallback().callback(request.getPutMessageResult()); + } } /** diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java new file mode 100644 index 000000000..f2cffcd5d --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * CommitLogPutMessageCallback is a asynchronous callback for messageStore + */ +public interface CommitLogPutMessageCallback { + void callback(PutMessageResult result) ; +} diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7a5647c3e..2c0854fef 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -302,10 +302,11 @@ public void destroyLogics() { } } - public PutMessageResult putMessage(MessageExtBrokerInner msg) { + public void putMessage(final MessageExtBrokerInner msg , final PutMessageCallback putMessageCallback) { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { @@ -314,7 +315,8 @@ public PutMessageResult putMessage(MessageExtBrokerInner msg) { log.warn("message store is slave mode, so putMessage is forbidden "); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } if (!this.runningFlags.isWriteable()) { @@ -323,45 +325,54 @@ public PutMessageResult putMessage(MessageExtBrokerInner msg) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } else { this.printTimes.set(0); } if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + return ; } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); - return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null)); + return ; } if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null)); + return ; } - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessage(msg); - - long eclipseTime = this.getSystemClock().now() - beginTime; - if (eclipseTime > 500) { - log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); - } - this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); + final long beginTime = this.getSystemClock().now(); + CommitLogPutMessageCallback commitLogPutMessageCallback = new CommitLogPutMessageCallback() { + @Override + public void callback(PutMessageResult result) { + long eclipseTime = getSystemClock().now() - beginTime; + if (eclipseTime > 500) { + log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); + } + storeStatsService.setPutMessageEntireTimeMax(eclipseTime); - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); - } + if (null == result || !result.isOk()) { + storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } - return result; + putMessageCallback.callback(result); + } + }; + this.commitLog.putMessage(msg , commitLogPutMessageCallback); } - public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { + public void putMessages(final MessageExtBatch messageExtBatch , final PutMessageCallback putMessageCallback) { if (this.shutdown) { log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { @@ -370,7 +381,8 @@ public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } if (!this.runningFlags.isWriteable()) { @@ -379,39 +391,47 @@ public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); + return ; } else { this.printTimes.set(0); } if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + return ; } if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + return ; } if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + putMessageCallback.callback(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null)); + return ; } - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessages(messageExtBatch); - - long eclipseTime = this.getSystemClock().now() - beginTime; - if (eclipseTime > 500) { - log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); - } - this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); + final long beginTime = this.getSystemClock().now(); + CommitLogPutMessageCallback commitLogPutMessageCallback = new CommitLogPutMessageCallback() { + @Override + public void callback(PutMessageResult result) { + long eclipseTime = getSystemClock().now() - beginTime; + if (eclipseTime > 500) { + log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); + } + storeStatsService.setPutMessageEntireTimeMax(eclipseTime); - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); - } + if (null == result || !result.isOk()) { + storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } - return result; + putMessageCallback.callback(result); + } + } ; + this.commitLog.putMessages(messageExtBatch , commitLogPutMessageCallback); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 907dfe209..4d49bf8f4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -51,21 +51,34 @@ */ void destroy(); - /** +/* + */ +/** * Store a message into store. * * @param msg Message instance to store * @return result of store operation. - */ + *//* + PutMessageResult putMessage(final MessageExtBrokerInner msg); +*/ + + /** + * Store a message into store. + * + * @param msg Message instance to store + * @param putMessageCallback callback of put message + * + */ + void putMessage(final MessageExtBrokerInner msg , PutMessageCallback putMessageCallback); /** * Store a batch of messages. * * @param messageExtBatch Message batch. - * @return result of storing batch messages. + * @param putMessageCallback callback of put message */ - PutMessageResult putMessages(final MessageExtBatch messageExtBatch); + void putMessages(final MessageExtBatch messageExtBatch , PutMessageCallback putMessageCallback); /** * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java new file mode 100644 index 000000000..d83f3b48b --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * PutMessageCallback is a asynchronous callback for processor process the put message result + */ +public class PutMessageCallback { + private final Object waitObject = new Object(); + private volatile boolean completed = false; + private PutMessageResult putMessageResult; + + protected void doAction(PutMessageResult putMessageResult) { + //default empty + } + + public void callback(PutMessageResult putMessageResult) { + doAction(putMessageResult); + this.putMessageResult = putMessageResult; + completed = true; + synchronized (waitObject) { + waitObject.notifyAll(); + } + } + + public void waitComplete() throws InterruptedException { + waitComplete(-1); + } + + public void waitComplete(long timeout) throws InterruptedException { + synchronized (waitObject) { + if (timeout < 0) { + waitObject.wait(); + } + else { + waitObject.wait(timeout); + } + } + } + + public boolean isCompleted() { + return completed; + } + + public PutMessageResult getPutMessageResult() { + return putMessageResult; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 51a8a2703..cec967fec 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -25,10 +25,11 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -253,15 +254,15 @@ public String getServiceName() { class GroupTransferService extends ServiceThread { private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); - private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); - private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); + private ConcurrentSkipListMap<Long , CommitLog.GroupCommitRequest> groupCommitRequestConcurrentSkipListMap = new ConcurrentSkipListMap<>() ; - public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { - synchronized (this.requestsWrite) { - this.requestsWrite.add(request); + public void putRequest(final CommitLog.GroupCommitRequest request) { + if (request.getNextOffset() >= HAService.this.push2SlaveMaxOffset.get()) { + request.setFlushOK(true); + request.getGroupCommitCallback().doSlaveAction(request) ; } - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify + else { + groupCommitRequestConcurrentSkipListMap.put(request.getNextOffset() , request) ; } } @@ -269,32 +270,33 @@ public void notifyTransferSome() { this.notifyTransferObject.wakeup(); } - private void swapRequests() { - List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite; - this.requestsWrite = this.requestsRead; - this.requestsRead = tmp; - } - + /** + * wait slave fetch message or fetch timeout,then response the produce request + */ private void doWaitTransfer() { - synchronized (this.requestsRead) { - if (!this.requestsRead.isEmpty()) { - for (CommitLog.GroupCommitRequest req : this.requestsRead) { - boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - for (int i = 0; !transferOK && i < 5; i++) { - this.notifyTransferObject.waitForRunning(1000); - transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - } - - if (!transferOK) { - log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); - } - - req.wakeupCustomer(transferOK); + long waitStart = System.currentTimeMillis(); + boolean waitTimeout = false ; + while (!groupCommitRequestConcurrentSkipListMap.isEmpty() && + (groupCommitRequestConcurrentSkipListMap.firstEntry().getKey() <= HAService.this.push2SlaveMaxOffset.get() + || (waitTimeout = System.currentTimeMillis() - waitStart > 5000))) { + if (waitTimeout) { + Long offset = groupCommitRequestConcurrentSkipListMap.firstEntry().getKey(); + CommitLog.GroupCommitRequest request = groupCommitRequestConcurrentSkipListMap.remove(offset) ; + request.setFlushOK(false); + request.getGroupCommitCallback().doSlaveAction(request); + } + else { + ConcurrentNavigableMap<Long, CommitLog.GroupCommitRequest> subMap = groupCommitRequestConcurrentSkipListMap.headMap(HAService.this.push2SlaveMaxOffset.get()); + for (Long offset : subMap.keySet()) { + CommitLog.GroupCommitRequest request = subMap.remove(offset); + request.setFlushOK(true); + request.getGroupCommitCallback().doSlaveAction(request); } - - this.requestsRead.clear(); } + waitTimeout = false ; + waitStart = System.currentTimeMillis(); } + this.notifyTransferObject.waitForRunning(1000); } public void run() { @@ -302,7 +304,6 @@ public void run() { while (!this.isStopped()) { try { - this.waitForRunning(10); this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); @@ -312,11 +313,6 @@ public void run() { log.info(this.getServiceName() + " service end"); } - @Override - protected void onWaitEnd() { - this.swapRequests(); - } - @Override public String getServiceName() { return GroupTransferService.class.getSimpleName(); diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 35b8e8565..2b6315bbe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageCallback; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -44,9 +45,8 @@ import org.slf4j.LoggerFactory; public class ScheduleMessageService extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; @@ -283,11 +283,13 @@ public void executeOnTimeup() { if (msgExt != null) { try { + PutMessageCallback putMessageCallback = new PutMessageCallback(); MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); - PutMessageResult putMessageResult = - ScheduleMessageService.this.defaultMessageStore - .putMessage(msgInner); + ScheduleMessageService.this.defaultMessageStore + .putMessage(msgInner, putMessageCallback); + putMessageCallback.waitComplete(); + PutMessageResult putMessageResult = putMessageCallback.getPutMessageResult(); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index b7d38f8c7..94e78704e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -126,7 +126,9 @@ protected void putMsg(DefaultMessageStore master) throws Exception { long totalMsgs = 200; for (long i = 0; i < totalMsgs; i++) { - master.putMessage(buildMessage()); + PutMessageCallback putMessageCallback = new PutMessageCallback() ; + master.putMessage(buildMessage() , putMessageCallback); + putMessageCallback.waitComplete(); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 9269cdfa7..e31eee854 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -24,13 +24,12 @@ import java.nio.channels.OverlappingFileLockException; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.After; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -108,7 +107,7 @@ public void testWriteAndRead() throws Exception { QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); for (long i = 0; i < totalMsgs; i++) { - messageStore.putMessage(buildMessage()); + messageStore.putMessage(buildMessage() , new PutMessageCallback()); } for (long i = 0; i < totalMsgs; i++) { @@ -140,7 +139,7 @@ public void testGroupCommit() throws Exception { QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); for (long i = 0; i < totalMsgs; i++) { - messageStore.putMessage(buildMessage()); + messageStore.putMessage(buildMessage() , new PutMessageCallback()); } for (long i = 0; i < totalMsgs; i++) { @@ -153,7 +152,7 @@ public void testGroupCommit() throws Exception { private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) { for (long i = 0; i < totalMsgs; i++) { - master.putMessage(buildMessage()); + master.putMessage(buildMessage() , new PutMessageCallback()); } for (long i = 0; i < totalMsgs; i++) { @@ -172,7 +171,7 @@ public void testPullSize() throws Exception { MessageExtBrokerInner messageExtBrokerInner = buildMessage(); messageExtBrokerInner.setTopic(topic); messageExtBrokerInner.setQueueId(0); - messageStore.putMessage(messageExtBrokerInner); + messageStore.putMessage(messageExtBrokerInner , new PutMessageCallback()); } //wait for consume queue build Thread.sleep(10); @@ -190,7 +189,7 @@ public void testPullSize() throws Exception { private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map<String, String> properties) { + byte[] filterBitMap, Map<String, String> properties) { } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services