This is an automated email from the ASF dual-hosted git repository. hzh0425 pushed a commit to branch dledger-controller-snapshot in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f9a5690b043d1de4713bcb2310f29e6012ada28f Author: hzh0425 <[email protected]> AuthorDate: Sun Jan 1 14:36:20 2023 +0800 Solve merge conflicts --- .../impl/manager/ReplicasInfoManager.java | 26 +++++++++-------- .../impl/manager/ReplicasInfoManagerTest.java | 8 ++++-- .../store/ha/autoswitch/AutoSwitchHATest.java | 33 +++++++++++----------- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 591d22c16..32b6cb763 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -16,15 +16,6 @@ */ package org.apache.rocketmq.controller.impl.manager; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiPredicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.MixAll; @@ -54,6 +45,17 @@ import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRes import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiPredicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can @@ -240,7 +242,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager { final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(brokerName, brokerAddress, brokerId); result.addEvent(applyIdEvent); } else { - brokerId = brokerInfo.getBrokerId(brokerAddress); + brokerId = brokerInfo.getBrokerIdByAddress(brokerAddress); } response.setBrokerId(brokerId); response.setMasterEpoch(syncStateInfo.getMasterEpoch()); @@ -315,7 +317,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager { response.setMasterAddress(masterAddress); response.setMasterEpoch(syncStateInfo.getMasterEpoch()); if (StringUtils.isNotEmpty(request.getBrokerAddress())) { - response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress())); + response.setBrokerId(brokerInfo.getBrokerIdByAddress(request.getBrokerAddress())); } result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); return result; @@ -336,7 +338,7 @@ public class ReplicasInfoManager implements SnapshotAbleMetadataManager { final String master = syncStateInfo.getMasterAddress(); final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>(); syncStateSet.forEach(replicas -> { - long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas); + long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerIdByAddress(replicas); inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId)); }); diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java index 9eabedb9d..b46619536 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.controller.impl.controller.impl.manager; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.controller.elect.ElectPolicy; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; @@ -43,6 +41,10 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 93f35630d..524d2aa72 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -17,21 +17,9 @@ package org.apache.rocketmq.store.ha.autoswitch; -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBrokerInner; @@ -46,10 +34,23 @@ import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.apache.rocketmq.common.MixAll; import org.junit.After; -import org.junit.Test; import org.junit.Assume; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; @@ -193,7 +194,7 @@ public class AutoSwitchHATest { } private void checkMessage(final DefaultMessageStore messageStore, int totalNums, int startOffset) { - await().atMost(30, TimeUnit.SECONDS) + await().atMost(60, TimeUnit.SECONDS) .until(() -> { GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null); // System.out.printf(result + "%n");
