This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc31402d928 Attempt to fix flaky test by not requiring advance to 
succeed immediately (#36976)
fc31402d928 is described below

commit fc31402d92823b672200f4ad8d95a13acd40fa18
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Dec 4 21:20:15 2025 +0100

    Attempt to fix flaky test by not requiring advance to succeed immediately 
(#36976)
---
 .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 28 +++++++++++++++-------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 6fe65520873..7f3b394d7f6 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
 import static org.apache.beam.sdk.io.jms.CommonJms.toSerializableFunction;
 import static 
org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME;
 import static 
org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -86,6 +87,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.jms.JmsIO.UnboundedJmsReader;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -541,6 +543,16 @@ public class JmsIOTest {
     assertEquals(1, splits.size());
   }
 
+  private boolean advanceWithRetry(UnboundedSource.UnboundedReader reader) 
throws IOException {
+    for (int attempt = 0; attempt < 10; attempt++) {
+      if (reader.advance()) {
+        return true;
+      }
+      sleepUninterruptibly(java.time.Duration.ofMillis(100));
+    }
+    return false;
+  }
+
   @Test
   public void testCheckpointMark() throws Exception {
     // we are using no prefetch here
@@ -558,7 +570,7 @@ public class JmsIOTest {
 
     // consume 3 messages (NB: start already consumed the first message)
     for (int i = 0; i < 3; i++) {
-      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+      assertTrue(String.format("Failed at %d-th message", i), 
advanceWithRetry(reader));
     }
 
     // the messages are still pending in the queue (no ACK yet)
@@ -572,7 +584,7 @@ public class JmsIOTest {
 
     // we read the 6 pending messages
     for (int i = 0; i < 6; i++) {
-      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+      assertTrue(String.format("Failed at %d-th message", i), 
advanceWithRetry(reader));
     }
 
     // still 6 pending messages as we didn't finalize the checkpoint
@@ -592,8 +604,8 @@ public class JmsIOTest {
     assertTrue(reader.start());
 
     // consume 2 message (NB: start already consumed the first message)
-    assertTrue(reader.advance());
-    assertTrue(reader.advance());
+    assertTrue(advanceWithRetry(reader));
+    assertTrue(advanceWithRetry(reader));
 
     // get checkpoint mark after consumed 4 messages
     CheckpointMark mark = reader.getCheckpointMark();
@@ -724,7 +736,7 @@ public class JmsIOTest {
 
     // consume half the messages (NB: start already consumed the first message)
     for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
-      assertTrue(reader.advance());
+      assertTrue(advanceWithRetry(reader));
     }
 
     // the messages are still pending in the queue (no ACK yet)
@@ -738,7 +750,7 @@ public class JmsIOTest {
             () -> {
               try {
                 for (int i = 0; i < messagesToProcess / 2; i++) {
-                  assertTrue(reader.advance());
+                  assertTrue(advanceWithRetry(reader));
                 }
               } catch (IOException ex) {
                 throw new RuntimeException(ex);
@@ -877,7 +889,7 @@ public class JmsIOTest {
 
     // consume 3 more messages (NB: start already consumed the first message)
     for (int i = 0; i < 3; i++) {
-      assertTrue(reader.advance());
+      assertTrue(advanceWithRetry(reader));
     }
 
     // the messages are still pending in the queue (no ACK yet)
@@ -891,7 +903,7 @@ public class JmsIOTest {
 
     // we read the 6 pending messages
     for (int i = 0; i < 6; i++) {
-      assertTrue(reader.advance());
+      assertTrue(advanceWithRetry(reader));
     }
 
     // still 6 pending messages as we didn't finalize the checkpoint

Reply via email to