This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 47740d07ed9 Simplify MoreFutures.supplyAsync and MoreFutures.runAsync 
using a wrapper instead of chained stages and multiple completions. (#33042)
47740d07ed9 is described below

commit 47740d07ed9e0824b4975ca4cadece74aae38302
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 17:57:54 2024 +0100

    Simplify MoreFutures.supplyAsync and MoreFutures.runAsync using a wrapper 
instead of chained stages and multiple completions. (#33042)
    
    This ensures that what is joined, interacts with ForkJoinPool execution and 
avoids the possibility that async scheduled future is not joined.
---
 .../java/org/apache/beam/sdk/util/MoreFutures.java | 60 +++++++++-------------
 1 file changed, 24 insertions(+), 36 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index cd38da100a7..0999f2ad077 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -45,9 +45,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  *   <li>Return {@link CompletableFuture} only to the <i>producer</i> of a 
future value.
  * </ul>
  */
-@SuppressWarnings({
-  "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
-})
 public class MoreFutures {
 
   /**
@@ -99,22 +96,18 @@ public class MoreFutures {
    */
   public static <T> CompletionStage<T> supplyAsync(
       ThrowingSupplier<T> supplier, ExecutorService executorService) {
-    CompletableFuture<T> result = new CompletableFuture<>();
-
-    CompletionStage<Void> wrapper =
-        CompletableFuture.runAsync(
-            () -> {
-              try {
-                result.complete(supplier.get());
-              } catch (InterruptedException e) {
-                result.completeExceptionally(e);
-                Thread.currentThread().interrupt();
-              } catch (Throwable t) {
-                result.completeExceptionally(t);
-              }
-            },
-            executorService);
-    return wrapper.thenCompose(nothing -> result);
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return supplier.get();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new CompletionException(e);
+          } catch (Throwable t) {
+            throw new CompletionException(t);
+          }
+        },
+        executorService);
   }
 
   /**
@@ -132,23 +125,18 @@ public class MoreFutures {
    */
   public static CompletionStage<Void> runAsync(
       ThrowingRunnable runnable, ExecutorService executorService) {
-    CompletableFuture<Void> result = new CompletableFuture<>();
-
-    CompletionStage<Void> wrapper =
-        CompletableFuture.runAsync(
-            () -> {
-              try {
-                runnable.run();
-                result.complete(null);
-              } catch (InterruptedException e) {
-                result.completeExceptionally(e);
-                Thread.currentThread().interrupt();
-              } catch (Throwable t) {
-                result.completeExceptionally(t);
-              }
-            },
-            executorService);
-    return wrapper.thenCompose(nothing -> result);
+    return CompletableFuture.runAsync(
+        () -> {
+          try {
+            runnable.run();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new CompletionException(e);
+          } catch (Throwable t) {
+            throw new CompletionException(t);
+          }
+        },
+        executorService);
   }
 
   /**

Reply via email to