[FLINK-9909][core] ConjunctFuture does not cancel input futures If a ConjunctFuture is cancelled, then it won't cancel all of its input futures automatically. If the users needs this behaviour then he has to implement it explicitly. The reason for this change is that an implicit cancellation can have unwanted side effects, because all of the cancelled input futures' producers won't be executed.
This closes #6384. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a695e6b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a695e6b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a695e6b5 Branch: refs/heads/release-1.5 Commit: a695e6b5eb1a1be6176c358eb1345d2277586b1b Parents: 07718a6 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 20:17:11 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 23 17:22:52 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 40 -------------------- .../runtime/concurrent/FutureUtilsTest.java | 34 ----------------- 2 files changed, 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a695e6b5/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 1cffaab..3a7e800 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -508,22 +508,6 @@ public class FutureUtils { * @return The number of Futures in the conjunction that are already complete */ public abstract int getNumFuturesCompleted(); - - /** - * Gets the individual futures which make up the {@link ConjunctFuture}. - * - * @return Collection of futures which make up the {@link ConjunctFuture} - */ - protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures(); - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - for (CompletableFuture<?> completableFuture : getConjunctFutures()) { - completableFuture.cancel(mayInterruptIfRunning); - } - - return super.cancel(mayInterruptIfRunning); - } } /** @@ -531,8 +515,6 @@ public class FutureUtils { */ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> { - private final Collection<? extends CompletableFuture<? extends T>> resultFutures; - /** The total number of futures in the conjunction. */ private final int numTotal; @@ -564,7 +546,6 @@ public class FutureUtils { @SuppressWarnings("unchecked") ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> resultFutures) { - this.resultFutures = checkNotNull(resultFutures); this.numTotal = resultFutures.size(); results = (T[]) new Object[numTotal]; @@ -587,11 +568,6 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return resultFutures; - } } /** @@ -600,8 +576,6 @@ public class FutureUtils { */ private static final class WaitingConjunctFuture extends ConjunctFuture<Void> { - private final Collection<? extends CompletableFuture<?>> futures; - /** Number of completed futures. */ private final AtomicInteger numCompleted = new AtomicInteger(0); @@ -620,7 +594,6 @@ public class FutureUtils { } private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) { - this.futures = checkNotNull(futures); this.numTotal = futures.size(); if (futures.isEmpty()) { @@ -641,11 +614,6 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return futures; - } } /** @@ -673,14 +641,11 @@ public class FutureUtils { private final int numFuturesTotal; - private final Collection<? extends CompletableFuture<?>> futuresToComplete; - private int futuresCompleted; private Throwable globalThrowable; private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> futuresToComplete) { - this.futuresToComplete = checkNotNull(futuresToComplete); numFuturesTotal = futuresToComplete.size(); futuresCompleted = 0; @@ -725,11 +690,6 @@ public class FutureUtils { return futuresCompleted; } } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return futuresToComplete; - } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a695e6b5/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 07bc4c1..1639c91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -42,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.containsString; @@ -548,38 +546,6 @@ public class FutureUtilsTest extends TestLogger { } @Test - public void testCancelWaitingConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.waitForAll(inputFutures)); - } - - @Test - public void testCancelResultConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.combineAll(inputFutures)); - } - - @Test - public void testCancelCompleteConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.completeAll(inputFutures)); - } - - private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) { - final int numInputFutures = 10; - final Collection<CompletableFuture<Void>> inputFutures = new ArrayList<>(numInputFutures); - - for (int i = 0; i < numInputFutures; i++) { - inputFutures.add(new CompletableFuture<>()); - } - - final FutureUtils.ConjunctFuture<?> conjunctFuture = conjunctFutureFactory.apply(inputFutures); - - conjunctFuture.cancel(false); - - for (CompletableFuture<Void> inputFuture : inputFutures) { - assertThat(inputFuture.isCancelled(), is(true)); - } - } - - @Test public void testSupplyAsyncFailure() throws Exception { final String exceptionMessage = "Test exception"; final FlinkException testException = new FlinkException(exceptionMessage);