This is an automated email from the ASF dual-hosted git repository. hzh0425 pushed a commit to branch dledger-controller-snapshot in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f0089018d56f68f4cb4913bf93868af4497ca8bc Author: hzh0425 <[email protected]> AuthorDate: Mon Jan 2 19:49:20 2023 +0800 Polish DLedgerControllerTest --- .../controller/impl/DLedgerControllerTest.java | 49 ++++++++-------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java index c519a6c04..059e343b9 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.controller.impl.controller.impl; -import io.openmessaging.storage.dledger.DLedgerConfig; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.controller.Controller; @@ -43,6 +42,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -61,7 +61,7 @@ public class DLedgerControllerTest { private List<DLedgerController> controllers; public DLedgerController launchController(final String group, final String peers, final String selfId, - String storeType, final boolean isEnableElectUncleanMaster, final int snapshotThreshold) { + final boolean isEnableElectUncleanMaster, final int snapshotThreshold) { String tmpdir = System.getProperty("java.io.tmpdir"); final String path = (StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir + File.separator) + group + File.separator + selfId; baseDirs.add(path); @@ -98,7 +98,7 @@ public class DLedgerControllerTest { } public boolean registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress, - boolean isFirstRegisteredBroker) throws Exception { + boolean isFirstRegisteredBroker) { // Register new broker final RegisterBrokerToControllerRequestHeader registerRequest = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress); RemotingCommand response = await().atMost(Duration.ofSeconds(20)).until(() -> { @@ -112,7 +112,7 @@ public class DLedgerControllerTest { e.printStackTrace(); return null; } - }, item -> item != null); + }, Objects::nonNull); final RegisterBrokerToControllerResponseHeader registerResult = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader(); @@ -131,19 +131,18 @@ public class DLedgerControllerTest { return false; } final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).get(10, TimeUnit.SECONDS); - final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader(); final SyncStateSet syncStateSet = RemotingSerializable.decode(getInfoResponse.getBody(), SyncStateSet.class); assertArrayEquals(syncStateSet.getSyncStateSet().toArray(), newSyncStateSet.toArray()); assertEquals(syncStateSet.getSyncStateSetEpoch(), syncStateSetEpoch + 1); return true; } - public DLedgerController waitLeader(final List<DLedgerController> controllers) throws Exception { + public DLedgerController waitLeader(final List<DLedgerController> controllers) { if (controllers.isEmpty()) { return null; } DLedgerController c1 = controllers.get(0); - DLedgerController dLedgerController = await().atMost(Duration.ofSeconds(10)).until(() -> { + return await().atMost(Duration.ofSeconds(10)).until(() -> { String leaderId = c1.getMemberState().getLeaderId(); if (null == leaderId) { return null; @@ -154,16 +153,15 @@ public class DLedgerControllerTest { } } return null; - }, item -> item != null); - return dLedgerController; + }, Objects::nonNull); } public DLedgerController mockMetaData(boolean enableElectUncleanMaster) throws Exception { String group = UUID.randomUUID().toString(); String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002); - DLedgerController c0 = launchController(group, peers, "n0", DLedgerConfig.MEMORY, enableElectUncleanMaster, 1000); - DLedgerController c1 = launchController(group, peers, "n1", DLedgerConfig.MEMORY, enableElectUncleanMaster, 1000); - DLedgerController c2 = launchController(group, peers, "n2", DLedgerConfig.MEMORY, enableElectUncleanMaster, 1000); + DLedgerController c0 = launchController(group, peers, "n0", enableElectUncleanMaster, 1000); + DLedgerController c1 = launchController(group, peers, "n1", enableElectUncleanMaster, 1000); + DLedgerController c2 = launchController(group, peers, "n2", enableElectUncleanMaster, 1000); controllers.add(c0); controllers.add(c1); controllers.add(c2); @@ -187,17 +185,6 @@ public class DLedgerControllerTest { return leader; } - public void setBrokerAlivePredicate(DLedgerController controller, String... deathBroker) { - controller.setBrokerAlivePredicate((clusterName, brokerAddress) -> { - for (String broker : deathBroker) { - if (broker.equals(brokerAddress)) { - return false; - } - } - return true; - }); - } - public void setBrokerElectPolicy(DLedgerController controller, String... deathBroker) { controller.setElectPolicy(new DefaultElectPolicy((clusterName, brokerAddress) -> { for (String broker : deathBroker) { @@ -210,13 +197,13 @@ public class DLedgerControllerTest { } @Test - public void testRecoverControllerFromStatemachineSnapshot() throws Exception { + public void testRecoverControllerFromStatemachineSnapshot() { int snapshotThreshold = 10; String group = UUID.randomUUID().toString(); String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002); - controllers.add(launchController(group, peers, "n0", DLedgerConfig.MEMORY, false, snapshotThreshold)); - controllers.add(launchController(group, peers, "n1", DLedgerConfig.MEMORY, false, snapshotThreshold)); - controllers.add(launchController(group, peers, "n2", DLedgerConfig.MEMORY, false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n0", false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n1", false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n2", false, snapshotThreshold)); DLedgerController leader = waitLeader(controllers); @@ -247,9 +234,9 @@ public class DLedgerControllerTest { } controllers.clear(); - controllers.add(launchController(group, peers, "n0", DLedgerConfig.MEMORY, false, snapshotThreshold)); - controllers.add(launchController(group, peers, "n1", DLedgerConfig.MEMORY, false, snapshotThreshold)); - controllers.add(launchController(group, peers, "n2", DLedgerConfig.MEMORY, false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n0", false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n1", false, snapshotThreshold)); + controllers.add(launchController(group, peers, "n2", false, snapshotThreshold)); flag = await().atMost(Duration.ofSeconds(10)).until(() -> { for (DLedgerController controller : controllers) { @@ -367,7 +354,7 @@ public class DLedgerControllerTest { } return null; - }, item -> item != null); + }, Objects::nonNull); final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) response.readCustomHeader(); final SyncStateSet syncStateSetResult = RemotingSerializable.decode(response.getBody(), SyncStateSet.class); assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
