This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e7a21ddb5b5 [#37198] Make withBackOffSupplier public to enable bounded
retry configuration (#37356)
e7a21ddb5b5 is described below
commit e7a21ddb5b52b30b7b64f3c02086585866333371
Author: ZIHAN DAI <[email protected]>
AuthorDate: Sat Jan 31 00:47:08 2026 +1100
[#37198] Make withBackOffSupplier public to enable bounded retry
configuration (#37356)
* [#37198] Make withBackOffSupplier public to enable bounded retry
configuration
Users need to configure bounded backoff to prevent infinite retry loops.
Making withBackOffSupplier public allows users to set
FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior.
Changes:
- Changed withBackOffSupplier() visibility from package-private to public
- Added comprehensive integration test with zero-delay BoundedBackOff
- Test verifies: responses empty, 1 failure emitted, call count =
maxRetries+1
The test uses a serializable BoundedBackOff class with assertions on both
PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded
retry behavior works correctly.
Fixes #37198
Related to #37176
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
* Add explicit public modifier to withSleeperSupplier method
Address review feedback from @damccorm to explicitly add
the public keyword to withSleeperSupplier() for consistency.
Co-Authored-By: Claude Opus 4.5 <[email protected]>
* Fix Spotless formatting for withSleeperSupplier method
Break long line to comply with code style requirements.
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---------
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
.../beam/io/requestresponse/RequestResponseIO.java | 6 ++-
.../io/requestresponse/RequestResponseIOTest.java | 55 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
index 8a04f76b682..167325fc6f9 100644
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
@@ -217,7 +217,8 @@ public class RequestResponseIO<RequestT, ResponseT>
* for a {@link SerializableSupplier} instead of setting this directly is
that some
* implementations of {@link Sleeper} may not be {@link Serializable}.
*/
- RequestResponseIO<RequestT, ResponseT>
withSleeperSupplier(SerializableSupplier<Sleeper> value) {
+ public RequestResponseIO<RequestT, ResponseT> withSleeperSupplier(
+ SerializableSupplier<Sleeper> value) {
return new RequestResponseIO<>(
rrioConfiguration,
callConfiguration.toBuilder().setSleeperSupplier(value).build());
}
@@ -229,7 +230,8 @@ public class RequestResponseIO<RequestT, ResponseT>
* need for a {@link SerializableSupplier} instead of setting this directly
is that some {@link
* BackOff} implementations, such as {@link FluentBackoff} are not {@link
Serializable}.
*/
- RequestResponseIO<RequestT, ResponseT>
withBackOffSupplier(SerializableSupplier<BackOff> value) {
+ public RequestResponseIO<RequestT, ResponseT> withBackOffSupplier(
+ SerializableSupplier<BackOff> value) {
return new RequestResponseIO<>(
rrioConfiguration,
callConfiguration.toBuilder().setBackOffSupplier(value).build());
}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
index cd0b29bab66..5a199225f39 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import com.google.auto.value.AutoValue;
+import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.SerializableSupplier;
@@ -333,6 +335,36 @@ public class RequestResponseIOTest {
greaterThan(0L));
}
+ @Test
+ public void givenBoundedBackoff_thenRetriesStopAfterLimit() {
+ int maxRetries = 3;
+ Caller<Request, Response> caller = new CallerImpl(5);
+ SerializableSupplier<BackOff> boundedBackoffSupplier = () -> new
BoundedBackOff(maxRetries);
+
+ Result<Response> result =
+ requests()
+ .apply(
+ "rrio",
+ RequestResponseIO.of(caller, RESPONSE_CODER)
+ .withBackOffSupplier(boundedBackoffSupplier)
+ .withMonitoringConfiguration(
+
Monitoring.builder().setCountCalls(true).setCountFailures(true).build()));
+
+ PAssert.that(result.getResponses()).empty();
+ PAssert.thatSingleton(result.getFailures().apply("CountFailures",
Count.globally()))
+ .isEqualTo(1L);
+
+ PipelineResult pipelineResult = pipeline.run();
+ MetricResults metrics = pipelineResult.metrics();
+ pipelineResult.waitUntilFinish();
+
+ assertThat(
+ getCounterResult(metrics, Call.class,
Monitoring.callCounterNameOf(caller)),
+ equalTo((long) maxRetries + 1));
+ assertThat(
+ getCounterResult(metrics, Call.class,
Monitoring.FAILURES_COUNTER_NAME), equalTo(1L));
+ }
+
// TODO(damondouglas): Count metrics of caching after
https://github.com/apache/beam/issues/29888
// resolves.
@Ignore
@@ -463,6 +495,29 @@ public class RequestResponseIOTest {
}
}
+ private static class BoundedBackOff implements BackOff, Serializable {
+ private final int maxRetries;
+ private int retries = 0;
+
+ private BoundedBackOff(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ @Override
+ public void reset() {
+ retries = 0;
+ }
+
+ @Override
+ public long nextBackOffMillis() {
+ if (retries >= maxRetries) {
+ return BackOff.STOP;
+ }
+ retries++;
+ return 0L;
+ }
+ }
+
private static class CustomBackOffSupplier implements
SerializableSupplier<BackOff> {
private final Counter counter =
Metrics.counter(CustomBackOffSupplier.class, "custom_counter");