This is an automated email from the ASF dual-hosted git repository. RongtongJin pushed a commit to branch codex/fix-ha-test-semi-sync-flakiness in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3b41597e2f0f8a9d75ffc8eca7da537a7afe5a93 Author: RongtongJin <[email protected]> AuthorDate: Sat Jun 13 09:56:59 2026 +0800 Fix flaky HATest semi-sync replication --- .../src/test/java/org/apache/rocketmq/store/HATest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 5623adb64f..8bdb82000b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -25,6 +25,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.time.Duration; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +39,7 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.ha.HAConnection; import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; @@ -104,6 +106,7 @@ public class HATest { slaveMessageStore.start(); slaveMessageStore.updateHaMasterAddress("127.0.0.1:" + masterMessageStoreConfig.getHaListenPort()); await().atMost(6, SECONDS).until(() -> slaveMessageStore.getHaService().getHAClient().getCurrentState() == HAConnectionState.TRANSFER); + await().atMost(6, SECONDS).until(this::isSlaveReadyForReplication); } @Test @@ -281,6 +284,20 @@ public class HATest { return msg; } + private boolean isSlaveReadyForReplication() { + if (slaveMessageStore.getHaService().getHAClient().getCurrentState() != HAConnectionState.TRANSFER) { + return false; + } + + long slaveMaxOffset = slaveMessageStore.getMaxPhyOffset(); + List<HAConnection> connections = messageStore.getHaService().getConnectionList(); + synchronized (connections) { + return connections.stream().anyMatch(connection -> + connection.getCurrentState() == HAConnectionState.TRANSFER + && connection.getSlaveAckOffset() >= slaveMaxOffset); + } + } + private boolean isCommitLogAvailable(DefaultMessageStore store) { try { Field serviceField = store.getClass().getDeclaredField("reputMessageService");
