Allow setting base factor for commercial data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0c022e05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0c022e05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0c022e05 Branch: refs/heads/spec Commit: 0c022e05af86fa53c71bcb92b93a8a2b6fb82908 Parents: e5892e1 Author: yukon <[email protected]> Authored: Tue Dec 27 16:38:25 2016 +0800 Committer: yukon <[email protected]> Committed: Tue Dec 27 16:38:25 2016 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/processor/PullMessageProcessor.java | 4 +++- .../rocketmq/broker/processor/SendMessageProcessor.java | 3 ++- .../main/java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java index 0152b93..1257f18 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java @@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { switch (response.getCode()) { case ResponseCode.SUCCESS: + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); - context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial()); + context.setCommercialRcvTimes(incValue); context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); context.setCommercialOwner(owner); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java index 414b3f4..a375285 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java @@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java index 6eae0a7..ba80a3f 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java @@ -85,6 +85,7 @@ public class BrokerConfig { private int commercialTimerCount = 1; private int commercialTransCount = 1; private int commercialBigCount = 1; + private int commercialBaseCount = 1; private boolean transferMsgByHeap = true; private int maxDelayTime = 40; @@ -537,4 +538,12 @@ public class BrokerConfig { public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) { this.consumerManageThreadPoolNums = consumerManageThreadPoolNums; } + + public int getCommercialBaseCount() { + return commercialBaseCount; + } + + public void setCommercialBaseCount(int commercialBaseCount) { + this.commercialBaseCount = commercialBaseCount; + } }
