This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b5bc1ff5d6 Fix flaky HATest semi-sync replication (#10495)
b5bc1ff5d6 is described below

commit b5bc1ff5d687ab3d1cd95fe0d37260c6d3f0d49d
Author: rongtong <[email protected]>
AuthorDate: Sun Jun 21 09:37:25 2026 +0800

    Fix flaky HATest semi-sync replication (#10495)
---
 .../src/test/java/org/apache/rocketmq/store/HATest.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java 
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index 5623adb64f..8bdb82000b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.time.Duration;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +39,7 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAConnection;
 import org.apache.rocketmq.store.ha.HAConnectionState;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
@@ -104,6 +106,7 @@ public class HATest {
         slaveMessageStore.start();
         slaveMessageStore.updateHaMasterAddress("127.0.0.1:" + 
masterMessageStoreConfig.getHaListenPort());
         await().atMost(6, SECONDS).until(() -> 
slaveMessageStore.getHaService().getHAClient().getCurrentState() == 
HAConnectionState.TRANSFER);
+        await().atMost(6, SECONDS).until(this::isSlaveReadyForReplication);
     }
 
     @Test
@@ -281,6 +284,20 @@ public class HATest {
         return msg;
     }
 
+    private boolean isSlaveReadyForReplication() {
+        if (slaveMessageStore.getHaService().getHAClient().getCurrentState() 
!= HAConnectionState.TRANSFER) {
+            return false;
+        }
+
+        long slaveMaxOffset = slaveMessageStore.getMaxPhyOffset();
+        List<HAConnection> connections = 
messageStore.getHaService().getConnectionList();
+        synchronized (connections) {
+            return connections.stream().anyMatch(connection ->
+                connection.getCurrentState() == HAConnectionState.TRANSFER
+                    && connection.getSlaveAckOffset() >= slaveMaxOffset);
+        }
+    }
+
     private boolean isCommitLogAvailable(DefaultMessageStore store) {
         try {
             Field serviceField = 
store.getClass().getDeclaredField("reputMessageService");

Reply via email to