This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a619a580802 fix rrio teardown executor cleanup path (#38417)
a619a580802 is described below

commit a619a5808022c69417d0178b1ed777d523160ad8
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Fri May 8 19:51:18 2026 +0200

    fix rrio teardown executor cleanup path (#38417)
---
 .../org/apache/beam/io/requestresponse/Call.java   | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index b515957459b..b318cac1737 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -270,34 +270,35 @@ class Call<RequestT, ResponseT> extends 
PTransform<PCollection<RequestT>, Result
       Sleeper sleeper = configuration.getSleeperSupplier().get();
 
       backoffIfNeeded(backOff, sleeper);
-
-      if (!configuration.getShouldRepeat()) {
-        incIfPresent(teardownCounter);
-        setupTeardown.teardown();
-        return;
-      }
-
-      Repeater<Void, Void> repeater =
-          Repeater.<Void, Void>builder()
-              .setBackOff(backOff)
-              .setSleeper(sleeper)
-              .setThrowableFunction(
-                  ignored -> {
-                    incIfPresent(teardownCounter);
-                    setupTeardown.teardown();
-                    return null;
-                  })
-              .build()
-              .withBackoffCounter(backoffCounter)
-              .withSleeperCounter(sleeperCounter);
-
-      repeater.apply(null);
-
-      checkStateNotNull(executor).shutdown();
       try {
-        boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS);
-      } catch (InterruptedException ignored) {
-        // Ignore the interrupt during teardown.
+        if (!configuration.getShouldRepeat()) {
+          incIfPresent(teardownCounter);
+          setupTeardown.teardown();
+          return;
+        }
+
+        Repeater<Void, Void> repeater =
+            Repeater.<Void, Void>builder()
+                .setBackOff(backOff)
+                .setSleeper(sleeper)
+                .setThrowableFunction(
+                    ignored -> {
+                      incIfPresent(teardownCounter);
+                      setupTeardown.teardown();
+                      return null;
+                    })
+                .build()
+                .withBackoffCounter(backoffCounter)
+                .withSleeperCounter(sleeperCounter);
+
+        repeater.apply(null);
+      } finally {
+        checkStateNotNull(executor).shutdown();
+        try {
+          boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS);
+        } catch (InterruptedException ignored) {
+          // Ignore the interrupt during teardown.
+        }
       }
     }
 

Reply via email to