This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 90c1476899d035ad2ef7717cb7455177a2ef2edd Author: RongtongJin <[email protected]> AuthorDate: Wed Feb 1 16:27:40 2023 +0800 Resolve the conflict --- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 16 ++++++++++------ .../controller/impl/manager/ReplicasInfoManager.java | 4 ++-- .../controller/impl/manager/ReplicasInfoManagerTest.java | 6 +++--- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index a5568d832..76a736ed6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -99,9 +99,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { */ private long pullTimeDelayMillsWhenException = 3000; /** - * Flow control interval + * Flow control interval when cache is full */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50; + /** + * Flow control interval when broker throw flow control exception + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 50; /** * Delay some time when suspend pull service */ @@ -264,7 +268,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", @@ -274,7 +278,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", @@ -285,7 +289,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", @@ -505,7 +509,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } if (processQueue.getWaiAckMsgCount() > this.defaultMQPushConsumer.getPopThresholdForQueue()) { - this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn("the messages waiting to ack exceeds the threshold {}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}", this.defaultMQPushConsumer.getPopThresholdForQueue(), popRequest, queueFlowControlTimes, processQueue.getWaiAckMsgCount()); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index c9c5e0426..dc0339d0c 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -321,9 +321,9 @@ public class ReplicasInfoManager { final ArrayList<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = new ArrayList<>(); final ArrayList<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = new ArrayList<>(); - brokerInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> { + brokerReplicaInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> { if (syncStateSet.contains(brokerAddress)) { - long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress); + long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(brokerAddress); inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id)); } else { notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId)); diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java index 57d372349..8dc637842 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java @@ -32,7 +32,7 @@ import org.apache.rocketmq.controller.impl.event.EventMessage; import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.body.InSyncStateData; +import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader; @@ -101,7 +101,7 @@ public class ReplicasInfoManagerTest { final GetReplicaInfoResponseHeader replicaInfoBefore = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse(); byte[] body = this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody(); - InSyncStateData syncStateDataBefore = RemotingSerializable.decode(body, InSyncStateData.class); + BrokerReplicasInfo syncStateDataBefore = RemotingSerializable.decode(body, BrokerReplicasInfo.class); // Try elect itself as a master ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress); final ControllerResult<ElectMasterResponseHeader> result = this.replicasInfoManager.electMaster(requestHeader, this.electPolicy); @@ -131,7 +131,7 @@ public class ReplicasInfoManagerTest { assertEquals(brokerId, replicaInfoAfter.getBrokerId()); return; } - if (syncStateDataBefore.getInSyncStateTable().containsKey(brokerAddress) || this.config.isEnableElectUncleanMaster()) { + if (syncStateDataBefore.getReplicasInfoTable().containsKey(brokerAddress) || this.config.isEnableElectUncleanMaster()) { // can be elected successfully assertEquals(ResponseCode.SUCCESS, result.getResponseCode()); assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId());
