This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit a270d857870a1c8dae3eacddbab228bc3e25f260 Author: Kenneth Knowles <k...@apache.org> AuthorDate: Mon Feb 12 12:21:18 2018 -0800 [BEAM-3697] Fix MoreFutures errorprone --- .../java/org/apache/beam/sdk/util/MoreFutures.java | 35 +++++----- .../org/apache/beam/sdk/util/MoreFuturesTest.java | 81 ++++++++++++++++++++++ 2 files changed, 100 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java index 7b49503..8275fad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java @@ -94,7 +94,8 @@ public class MoreFutures { public static <T> CompletionStage<T> supplyAsync( ThrowingSupplier<T> supplier, ExecutorService executorService) { CompletableFuture<T> result = new CompletableFuture<>(); - CompletableFuture.runAsync( + + CompletionStage<Void> wrapper = CompletableFuture.runAsync( () -> { try { result.complete(supplier.get()); @@ -106,7 +107,7 @@ public class MoreFutures { } }, executorService); - return result; + return wrapper.thenCompose(nothing -> result); } /** @@ -125,20 +126,22 @@ public class MoreFutures { public static CompletionStage<Void> runAsync( ThrowingRunnable runnable, ExecutorService executorService) { CompletableFuture<Void> result = new CompletableFuture<>(); - CompletableFuture.runAsync( - () -> { - try { - runnable.run(); - result.complete(null); - } catch (InterruptedException e) { - result.completeExceptionally(e); - Thread.currentThread().interrupt(); - } catch (Throwable t) { - result.completeExceptionally(t); - } - }, - executorService); - return result; + + CompletionStage<Void> wrapper = + CompletableFuture.runAsync( + () -> { + try { + runnable.run(); + result.complete(null); + } catch (InterruptedException e) { + result.completeExceptionally(e); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + result.completeExceptionally(t); + } + }, + executorService); + return wrapper.thenCompose(nothing -> result); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java new file mode 100644 index 0000000..22ab4c0 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MoreFuturesTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link MoreFutures}. */ +@RunWith(JUnit4.class) +public class MoreFuturesTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void supplyAsyncSuccess() throws Exception { + CompletionStage<Integer> future = MoreFutures.supplyAsync(() -> 42); + assertThat(MoreFutures.get(future), equalTo(42)); + } + + @Test + public void supplyAsyncFailure() throws Exception { + final String testMessage = "this is just a test"; + CompletionStage<Long> future = MoreFutures.supplyAsync(() -> { + throw new IllegalStateException(testMessage); + }); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage(testMessage); + MoreFutures.get(future); + } + + @Test + public void runAsyncSuccess() throws Exception { + AtomicInteger result = new AtomicInteger(0); + CompletionStage<Void> sideEffectFuture = MoreFutures.runAsync(() -> { + result.set(42); + }); + + MoreFutures.get(sideEffectFuture); + assertThat(result.get(), equalTo(42)); + } + + @Test + public void runAsyncFailure() throws Exception { + final String testMessage = "this is just a test"; + CompletionStage<Void> sideEffectFuture = MoreFutures.runAsync(() -> { + throw new IllegalStateException(testMessage); + }); + + thrown.expect(ExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage(testMessage); + MoreFutures.get(sideEffectFuture); + } +} -- To stop receiving notification emails like this one, please contact ieme...@apache.org.