This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fc31402d928 Attempt to fix flaky test by not requiring advance to
succeed immediately (#36976)
fc31402d928 is described below
commit fc31402d92823b672200f4ad8d95a13acd40fa18
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Dec 4 21:20:15 2025 +0100
Attempt to fix flaky test by not requiring advance to succeed immediately
(#36976)
---
.../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 28 +++++++++++++++-------
1 file changed, 20 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 6fe65520873..7f3b394d7f6 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
import static org.apache.beam.sdk.io.jms.CommonJms.toSerializableFunction;
import static
org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME;
import static
org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -86,6 +87,7 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.jms.JmsIO.UnboundedJmsReader;
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -541,6 +543,16 @@ public class JmsIOTest {
assertEquals(1, splits.size());
}
+ private boolean advanceWithRetry(UnboundedSource.UnboundedReader reader)
throws IOException {
+ for (int attempt = 0; attempt < 10; attempt++) {
+ if (reader.advance()) {
+ return true;
+ }
+ sleepUninterruptibly(java.time.Duration.ofMillis(100));
+ }
+ return false;
+ }
+
@Test
public void testCheckpointMark() throws Exception {
// we are using no prefetch here
@@ -558,7 +570,7 @@ public class JmsIOTest {
// consume 3 messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
- assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
+ assertTrue(String.format("Failed at %d-th message", i),
advanceWithRetry(reader));
}
// the messages are still pending in the queue (no ACK yet)
@@ -572,7 +584,7 @@ public class JmsIOTest {
// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
- assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
+ assertTrue(String.format("Failed at %d-th message", i),
advanceWithRetry(reader));
}
// still 6 pending messages as we didn't finalize the checkpoint
@@ -592,8 +604,8 @@ public class JmsIOTest {
assertTrue(reader.start());
// consume 2 message (NB: start already consumed the first message)
- assertTrue(reader.advance());
- assertTrue(reader.advance());
+ assertTrue(advanceWithRetry(reader));
+ assertTrue(advanceWithRetry(reader));
// get checkpoint mark after consumed 4 messages
CheckpointMark mark = reader.getCheckpointMark();
@@ -724,7 +736,7 @@ public class JmsIOTest {
// consume half the messages (NB: start already consumed the first message)
for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
- assertTrue(reader.advance());
+ assertTrue(advanceWithRetry(reader));
}
// the messages are still pending in the queue (no ACK yet)
@@ -738,7 +750,7 @@ public class JmsIOTest {
() -> {
try {
for (int i = 0; i < messagesToProcess / 2; i++) {
- assertTrue(reader.advance());
+ assertTrue(advanceWithRetry(reader));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
@@ -877,7 +889,7 @@ public class JmsIOTest {
// consume 3 more messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
- assertTrue(reader.advance());
+ assertTrue(advanceWithRetry(reader));
}
// the messages are still pending in the queue (no ACK yet)
@@ -891,7 +903,7 @@ public class JmsIOTest {
// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
- assertTrue(reader.advance());
+ assertTrue(advanceWithRetry(reader));
}
// still 6 pending messages as we didn't finalize the checkpoint