This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch codex/fix-ha-test-semi-sync-flakiness
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3b41597e2f0f8a9d75ffc8eca7da537a7afe5a93
Author: RongtongJin <[email protected]>
AuthorDate: Sat Jun 13 09:56:59 2026 +0800

    Fix flaky HATest semi-sync replication
---
 .../src/test/java/org/apache/rocketmq/store/HATest.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java 
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index 5623adb64f..8bdb82000b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.time.Duration;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +39,7 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAConnection;
 import org.apache.rocketmq.store.ha.HAConnectionState;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
@@ -104,6 +106,7 @@ public class HATest {
         slaveMessageStore.start();
         slaveMessageStore.updateHaMasterAddress("127.0.0.1:" + 
masterMessageStoreConfig.getHaListenPort());
         await().atMost(6, SECONDS).until(() -> 
slaveMessageStore.getHaService().getHAClient().getCurrentState() == 
HAConnectionState.TRANSFER);
+        await().atMost(6, SECONDS).until(this::isSlaveReadyForReplication);
     }
 
     @Test
@@ -281,6 +284,20 @@ public class HATest {
         return msg;
     }
 
+    private boolean isSlaveReadyForReplication() {
+        if (slaveMessageStore.getHaService().getHAClient().getCurrentState() 
!= HAConnectionState.TRANSFER) {
+            return false;
+        }
+
+        long slaveMaxOffset = slaveMessageStore.getMaxPhyOffset();
+        List<HAConnection> connections = 
messageStore.getHaService().getConnectionList();
+        synchronized (connections) {
+            return connections.stream().anyMatch(connection ->
+                connection.getCurrentState() == HAConnectionState.TRANSFER
+                    && connection.getSlaveAckOffset() >= slaveMaxOffset);
+        }
+    }
+
     private boolean isCommitLogAvailable(DefaultMessageStore store) {
         try {
             Field serviceField = 
store.getClass().getDeclaredField("reputMessageService");

Reply via email to