This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e7a21ddb5b5 [#37198] Make withBackOffSupplier public to enable bounded 
retry configuration (#37356)
e7a21ddb5b5 is described below

commit e7a21ddb5b52b30b7b64f3c02086585866333371
Author: ZIHAN DAI <[email protected]>
AuthorDate: Sat Jan 31 00:47:08 2026 +1100

    [#37198] Make withBackOffSupplier public to enable bounded retry 
configuration (#37356)
    
    * [#37198] Make withBackOffSupplier public to enable bounded retry 
configuration
    
    Users need to configure bounded backoff to prevent infinite retry loops.
    Making withBackOffSupplier public allows users to set
    FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior.
    
    Changes:
    - Changed withBackOffSupplier() visibility from package-private to public
    - Added comprehensive integration test with zero-delay BoundedBackOff
    - Test verifies: responses empty, 1 failure emitted, call count = 
maxRetries+1
    
    The test uses a serializable BoundedBackOff class with assertions on both
    PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded
    retry behavior works correctly.
    
    Fixes #37198
    Related to #37176
    
    Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
    
    * Add explicit public modifier to withSleeperSupplier method
    
    Address review feedback from @damccorm to explicitly add
    the public keyword to withSleeperSupplier() for consistency.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * Fix Spotless formatting for withSleeperSupplier method
    
    Break long line to comply with code style requirements.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
 .../beam/io/requestresponse/RequestResponseIO.java |  6 ++-
 .../io/requestresponse/RequestResponseIOTest.java  | 55 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
index 8a04f76b682..167325fc6f9 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java
@@ -217,7 +217,8 @@ public class RequestResponseIO<RequestT, ResponseT>
    * for a {@link SerializableSupplier} instead of setting this directly is 
that some
    * implementations of {@link Sleeper} may not be {@link Serializable}.
    */
-  RequestResponseIO<RequestT, ResponseT> 
withSleeperSupplier(SerializableSupplier<Sleeper> value) {
+  public RequestResponseIO<RequestT, ResponseT> withSleeperSupplier(
+      SerializableSupplier<Sleeper> value) {
     return new RequestResponseIO<>(
         rrioConfiguration, 
callConfiguration.toBuilder().setSleeperSupplier(value).build());
   }
@@ -229,7 +230,8 @@ public class RequestResponseIO<RequestT, ResponseT>
    * need for a {@link SerializableSupplier} instead of setting this directly 
is that some {@link
    * BackOff} implementations, such as {@link FluentBackoff} are not {@link 
Serializable}.
    */
-  RequestResponseIO<RequestT, ResponseT> 
withBackOffSupplier(SerializableSupplier<BackOff> value) {
+  public RequestResponseIO<RequestT, ResponseT> withBackOffSupplier(
+      SerializableSupplier<BackOff> value) {
     return new RequestResponseIO<>(
         rrioConfiguration, 
callConfiguration.toBuilder().setBackOffSupplier(value).build());
   }
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
index cd0b29bab66..5a199225f39 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.Coder;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.schemas.SchemaProvider;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.SerializableSupplier;
@@ -333,6 +335,36 @@ public class RequestResponseIOTest {
         greaterThan(0L));
   }
 
+  @Test
+  public void givenBoundedBackoff_thenRetriesStopAfterLimit() {
+    int maxRetries = 3;
+    Caller<Request, Response> caller = new CallerImpl(5);
+    SerializableSupplier<BackOff> boundedBackoffSupplier = () -> new 
BoundedBackOff(maxRetries);
+
+    Result<Response> result =
+        requests()
+            .apply(
+                "rrio",
+                RequestResponseIO.of(caller, RESPONSE_CODER)
+                    .withBackOffSupplier(boundedBackoffSupplier)
+                    .withMonitoringConfiguration(
+                        
Monitoring.builder().setCountCalls(true).setCountFailures(true).build()));
+
+    PAssert.that(result.getResponses()).empty();
+    PAssert.thatSingleton(result.getFailures().apply("CountFailures", 
Count.globally()))
+        .isEqualTo(1L);
+
+    PipelineResult pipelineResult = pipeline.run();
+    MetricResults metrics = pipelineResult.metrics();
+    pipelineResult.waitUntilFinish();
+
+    assertThat(
+        getCounterResult(metrics, Call.class, 
Monitoring.callCounterNameOf(caller)),
+        equalTo((long) maxRetries + 1));
+    assertThat(
+        getCounterResult(metrics, Call.class, 
Monitoring.FAILURES_COUNTER_NAME), equalTo(1L));
+  }
+
   // TODO(damondouglas): Count metrics of caching after 
https://github.com/apache/beam/issues/29888
   // resolves.
   @Ignore
@@ -463,6 +495,29 @@ public class RequestResponseIOTest {
     }
   }
 
+  private static class BoundedBackOff implements BackOff, Serializable {
+    private final int maxRetries;
+    private int retries = 0;
+
+    private BoundedBackOff(int maxRetries) {
+      this.maxRetries = maxRetries;
+    }
+
+    @Override
+    public void reset() {
+      retries = 0;
+    }
+
+    @Override
+    public long nextBackOffMillis() {
+      if (retries >= maxRetries) {
+        return BackOff.STOP;
+      }
+      retries++;
+      return 0L;
+    }
+  }
+
   private static class CustomBackOffSupplier implements 
SerializableSupplier<BackOff> {
 
     private final Counter counter = 
Metrics.counter(CustomBackOffSupplier.class, "custom_counter");

Reply via email to