This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 47740d07ed9 Simplify MoreFutures.supplyAsync and MoreFutures.runAsync
using a wrapper instead of chained stages and multiple completions. (#33042)
47740d07ed9 is described below
commit 47740d07ed9e0824b4975ca4cadece74aae38302
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 17:57:54 2024 +0100
Simplify MoreFutures.supplyAsync and MoreFutures.runAsync using a wrapper
instead of chained stages and multiple completions. (#33042)
This ensures that what is joined, interacts with ForkJoinPool execution and
avoids the possibility that async scheduled future is not joined.
---
.../java/org/apache/beam/sdk/util/MoreFutures.java | 60 +++++++++-------------
1 file changed, 24 insertions(+), 36 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index cd38da100a7..0999f2ad077 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -45,9 +45,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* <li>Return {@link CompletableFuture} only to the <i>producer</i> of a
future value.
* </ul>
*/
-@SuppressWarnings({
- "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
-})
public class MoreFutures {
/**
@@ -99,22 +96,18 @@ public class MoreFutures {
*/
public static <T> CompletionStage<T> supplyAsync(
ThrowingSupplier<T> supplier, ExecutorService executorService) {
- CompletableFuture<T> result = new CompletableFuture<>();
-
- CompletionStage<Void> wrapper =
- CompletableFuture.runAsync(
- () -> {
- try {
- result.complete(supplier.get());
- } catch (InterruptedException e) {
- result.completeExceptionally(e);
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- result.completeExceptionally(t);
- }
- },
- executorService);
- return wrapper.thenCompose(nothing -> result);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return supplier.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new CompletionException(e);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ },
+ executorService);
}
/**
@@ -132,23 +125,18 @@ public class MoreFutures {
*/
public static CompletionStage<Void> runAsync(
ThrowingRunnable runnable, ExecutorService executorService) {
- CompletableFuture<Void> result = new CompletableFuture<>();
-
- CompletionStage<Void> wrapper =
- CompletableFuture.runAsync(
- () -> {
- try {
- runnable.run();
- result.complete(null);
- } catch (InterruptedException e) {
- result.completeExceptionally(e);
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- result.completeExceptionally(t);
- }
- },
- executorService);
- return wrapper.thenCompose(nothing -> result);
+ return CompletableFuture.runAsync(
+ () -> {
+ try {
+ runnable.run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new CompletionException(e);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ },
+ executorService);
}
/**