[FLINK-9909][core] ConjunctFuture does not cancel input futures If a ConjunctFuture is cancelled, then it won't cancel all of its input futures automatically. If the users needs this behaviour then he has to implement it explicitly. The reason for this change is that an implicit cancellation can have unwanted side effects, because all of the cancelled input futures' producers won't be executed.
This closes #6384. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9afda733 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9afda733 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9afda733 Branch: refs/heads/master Commit: 9afda733a90a72be75ced9567452c6a7a5e3dc8c Parents: c897471 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 20:17:11 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Jul 24 00:05:39 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 40 -------------------- .../runtime/concurrent/FutureUtilsTest.java | 34 ----------------- 2 files changed, 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 1cffaab..3a7e800 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -508,22 +508,6 @@ public class FutureUtils { * @return The number of Futures in the conjunction that are already complete */ public abstract int getNumFuturesCompleted(); - - /** - * Gets the individual futures which make up the {@link ConjunctFuture}. - * - * @return Collection of futures which make up the {@link ConjunctFuture} - */ - protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures(); - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - for (CompletableFuture<?> completableFuture : getConjunctFutures()) { - completableFuture.cancel(mayInterruptIfRunning); - } - - return super.cancel(mayInterruptIfRunning); - } } /** @@ -531,8 +515,6 @@ public class FutureUtils { */ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> { - private final Collection<? extends CompletableFuture<? extends T>> resultFutures; - /** The total number of futures in the conjunction. */ private final int numTotal; @@ -564,7 +546,6 @@ public class FutureUtils { @SuppressWarnings("unchecked") ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> resultFutures) { - this.resultFutures = checkNotNull(resultFutures); this.numTotal = resultFutures.size(); results = (T[]) new Object[numTotal]; @@ -587,11 +568,6 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return resultFutures; - } } /** @@ -600,8 +576,6 @@ public class FutureUtils { */ private static final class WaitingConjunctFuture extends ConjunctFuture<Void> { - private final Collection<? extends CompletableFuture<?>> futures; - /** Number of completed futures. */ private final AtomicInteger numCompleted = new AtomicInteger(0); @@ -620,7 +594,6 @@ public class FutureUtils { } private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) { - this.futures = checkNotNull(futures); this.numTotal = futures.size(); if (futures.isEmpty()) { @@ -641,11 +614,6 @@ public class FutureUtils { public int getNumFuturesCompleted() { return numCompleted.get(); } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return futures; - } } /** @@ -673,14 +641,11 @@ public class FutureUtils { private final int numFuturesTotal; - private final Collection<? extends CompletableFuture<?>> futuresToComplete; - private int futuresCompleted; private Throwable globalThrowable; private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> futuresToComplete) { - this.futuresToComplete = checkNotNull(futuresToComplete); numFuturesTotal = futuresToComplete.size(); futuresCompleted = 0; @@ -725,11 +690,6 @@ public class FutureUtils { return futuresCompleted; } } - - @Override - protected Collection<? extends CompletableFuture<?>> getConjunctFutures() { - return futuresToComplete; - } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 07bc4c1..1639c91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -42,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.containsString; @@ -548,38 +546,6 @@ public class FutureUtilsTest extends TestLogger { } @Test - public void testCancelWaitingConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.waitForAll(inputFutures)); - } - - @Test - public void testCancelResultConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.combineAll(inputFutures)); - } - - @Test - public void testCancelCompleteConjunctFuture() { - cancelConjunctFuture(inputFutures -> FutureUtils.completeAll(inputFutures)); - } - - private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) { - final int numInputFutures = 10; - final Collection<CompletableFuture<Void>> inputFutures = new ArrayList<>(numInputFutures); - - for (int i = 0; i < numInputFutures; i++) { - inputFutures.add(new CompletableFuture<>()); - } - - final FutureUtils.ConjunctFuture<?> conjunctFuture = conjunctFutureFactory.apply(inputFutures); - - conjunctFuture.cancel(false); - - for (CompletableFuture<Void> inputFuture : inputFutures) { - assertThat(inputFuture.isCancelled(), is(true)); - } - } - - @Test public void testSupplyAsyncFailure() throws Exception { final String exceptionMessage = "Test exception"; final FlinkException testException = new FlinkException(exceptionMessage);