This is an automated email from the ASF dual-hosted git repository. RongtongJin pushed a commit to branch codex/fix-store-flaky-tests in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b43a61f3df66c5d92c637c0fdb73220dc894eb96 Author: RongtongJin <[email protected]> AuthorDate: Fri May 29 16:11:32 2026 +0800 Fix flaky store HA and DLedger tests --- .../store/dledger/DLedgerCommitlogTest.java | 18 +++--- .../store/dledger/DLedgerMultiPathTest.java | 2 +- .../store/dledger/MessageStoreTestBase.java | 8 +++ .../store/ha/autoswitch/AutoSwitchHATest.java | 66 +++++++++++++--------- 4 files changed, 57 insertions(+), 37 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 7b09a6aa2f..90536a71c3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -63,7 +63,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testTruncateCQ() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); { @@ -122,7 +122,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testRecover() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); { @@ -162,7 +162,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testDLedgerAbnormallyRecover() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); @@ -199,7 +199,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -242,7 +242,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testBatchPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -289,7 +289,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { public void testAsyncPutAndGetMessage() throws Exception { Assume.assumeFalse(MixAll.isWindows()); String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -333,7 +333,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testAsyncBatchPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -380,7 +380,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testCommittedPos() throws Exception { - String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort()); String group = UUID.randomUUID().toString(); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); @@ -409,7 +409,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testIPv6HostMsgCommittedPos() throws Exception { - String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort()); String group = UUID.randomUUID().toString(); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java index 9de4e4820e..315921d3dd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java @@ -43,7 +43,7 @@ public class DLedgerMultiPathTest extends MessageStoreTestBase { Assume.assumeFalse(MixAll.isWindows()); String base = createBaseDir(); String topic = UUID.randomUUID().toString(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String multiStorePath = base + "/multi/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index c4d9f0727b..c8370b9ddb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -20,6 +20,8 @@ import com.google.common.util.concurrent.RateLimiter; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +43,12 @@ import org.junit.Assert; public class MessageStoreTestBase extends StoreTestBase { + protected static int nextAvailablePort() throws IOException { + try (ServerSocket serverSocket = new ServerSocket(0)) { + return serverSocket.getLocalPort(); + } + } + protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception { System.setProperty("dledger.disk.ratio.check", "0.95"); System.setProperty("dledger.disk.ratio.clean", "0.95"); diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 519af44159..2d75948622 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -100,8 +100,8 @@ public class AutoSwitchHATest { storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache"); storeConfig1.setTotalReplicas(3); storeConfig1.setInSyncReplicas(2); + storeConfig1.setHaListenPort(0); buildMessageStoreConfig(storeConfig1, mappedFileSize); - this.store1HaAddress = "127.0.0.1:10912"; storeConfig2 = new MessageStoreConfig(); storeConfig2.setBrokerRole(BrokerRole.SLAVE); @@ -109,11 +109,10 @@ public class AutoSwitchHATest { storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2"); storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog"); storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache"); - storeConfig2.setHaListenPort(10943); + storeConfig2.setHaListenPort(0); storeConfig2.setTotalReplicas(3); storeConfig2.setInSyncReplicas(2); buildMessageStoreConfig(storeConfig2, mappedFileSize); - this.store2HaAddress = "127.0.0.1:10943"; messageStore1 = buildMessageStore(storeConfig1, 1L); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -124,7 +123,7 @@ public class AutoSwitchHATest { storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#3"); storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "commitlog"); storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "EpochFileCache"); - storeConfig3.setHaListenPort(10980); + storeConfig3.setHaListenPort(0); storeConfig3.setTotalReplicas(3); storeConfig3.setInSyncReplicas(2); buildMessageStoreConfig(storeConfig3, mappedFileSize); @@ -136,6 +135,8 @@ public class AutoSwitchHATest { messageStore1.start(); messageStore2.start(); messageStore3.start(); + this.store1HaAddress = haAddress(storeConfig1); + this.store2HaAddress = haAddress(storeConfig2); // ((AutoSwitchHAService) this.messageStore1.getHaService()).("127.0.0.1:8000"); // ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); @@ -154,18 +155,17 @@ public class AutoSwitchHATest { storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "commitlog"); storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache"); storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet); + storeConfig1.setHaListenPort(0); buildMessageStoreConfig(storeConfig1, mappedFileSize); - this.store1HaAddress = "127.0.0.1:10912"; storeConfig2 = new MessageStoreConfig(); storeConfig2.setBrokerRole(BrokerRole.SLAVE); storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2"); storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog"); storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache"); - storeConfig2.setHaListenPort(10943); + storeConfig2.setHaListenPort(0); storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet); buildMessageStoreConfig(storeConfig2, mappedFileSize); - this.store2HaAddress = "127.0.0.1:10943"; messageStore1 = buildMessageStore(storeConfig1, 1L); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -174,6 +174,8 @@ public class AutoSwitchHATest { assertTrue(messageStore2.load()); messageStore1.start(); messageStore2.start(); + this.store1HaAddress = haAddress(storeConfig1); + this.store2HaAddress = haAddress(storeConfig2); // ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); // ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); @@ -190,6 +192,8 @@ public class AutoSwitchHATest { flag &= slave.getHaService().changeToSlave("", epoch, slaveId); slave.getHaService().updateHaMasterAddress(masterHaAddress); flag &= master.getHaService().changeToMaster(epoch); + AutoSwitchHAService masterHaService = (AutoSwitchHAService) master.getHaService(); + await().atMost(10, TimeUnit.SECONDS).until(() -> masterHaService.getConnectionCount().get() > 0); // Put message on master for (int i = 0; i < totalPutMessageNums; i++) { PutMessageResult result = master.putMessage(buildMessage()); @@ -219,7 +223,7 @@ public class AutoSwitchHATest { final long confirmOffset = this.messageStore1.getConfirmOffset(); // Step2, shutdown store2 - this.messageStore2.shutdown(); + this.messageStore2 = shutdownStore(this.messageStore2); // Put message, which should succeed because slave is removed from syncStateSet, only master remains final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage()); @@ -229,7 +233,7 @@ public class AutoSwitchHATest { assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset); // Step3, shutdown store1, start store2, change store2 to master, epoch = 2 - this.messageStore1.shutdown(); + this.messageStore1 = shutdownStore(this.messageStore1); storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -293,8 +297,7 @@ public class AutoSwitchHATest { assertTrue(result.contains(2L)); // Now, shutdown store2 - this.messageStore2.shutdown(); - this.messageStore2.destroy(); + this.messageStore2 = destroyStore(this.messageStore2); // Wait for connection to be removed and syncStateSet to be updated by removeConnection await().atMost(10, TimeUnit.SECONDS).until(() -> { @@ -447,8 +450,7 @@ public class AutoSwitchHATest { checkMessage(this.messageStore2, 20, 0); // Step2: restart broker3 - messageStore3.shutdown(); - messageStore3.destroy(); + messageStore3 = destroyStore(messageStore3); storeConfig3.setSyncFromLastFile(true); messageStore3 = buildMessageStore(storeConfig3, 3L); @@ -457,7 +459,7 @@ public class AutoSwitchHATest { // Step2: add new broker3, link to broker1. because broker3 request sync from lastFile, so it only synced 10 msg from offset 10; messageStore3.getHaService().changeToSlave("", 2, 3L); - messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912"); + messageStore3.getHaService().updateHaMasterAddress(store1HaAddress); checkMessage(messageStore3, 10, 10); } @@ -498,7 +500,7 @@ public class AutoSwitchHATest { long tmpConfirmOffset = this.messageStore2.getConfirmOffset(); long setConfirmOffset = this.messageStore2.getConfirmOffset() - this.messageStore2.getConfirmOffset() / 2; - messageStore2.shutdown(); + messageStore2 = shutdownStore(messageStore2); StoreCheckpoint storeCheckpoint = new StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + "checkpoint"); assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset()); storeCheckpoint.setConfirmPhyOffset(setConfirmOffset); @@ -514,22 +516,28 @@ public class AutoSwitchHATest { @After public void destroy() throws Exception { - if (this.messageStore2 != null) { - messageStore2.shutdown(); - messageStore2.destroy(); - } - if (this.messageStore1 != null) { - messageStore1.shutdown(); - messageStore1.destroy(); - } - if (this.messageStore3 != null) { - messageStore3.shutdown(); - messageStore3.destroy(); - } + this.messageStore2 = destroyStore(this.messageStore2); + this.messageStore1 = destroyStore(this.messageStore1); + this.messageStore3 = destroyStore(this.messageStore3); File file = new File(storePathRootParentDir); UtilAll.deleteFile(file); } + private DefaultMessageStore shutdownStore(DefaultMessageStore messageStore) { + if (messageStore != null) { + messageStore.shutdown(); + } + return null; + } + + private DefaultMessageStore destroyStore(DefaultMessageStore messageStore) { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + return null; + } + private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreConfig, long brokerId) throws Exception { BrokerConfig brokerConfig = new BrokerConfig(); @@ -538,6 +546,10 @@ public class AutoSwitchHATest { return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>()); } + private String haAddress(MessageStoreConfig messageStoreConfig) { + return "127.0.0.1:" + messageStoreConfig.getHaListenPort(); + } + private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) { messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
