This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 904536271081f393a8c29341831275080b04255a Author: Stephan Ewen <se...@apache.org> AuthorDate: Wed Apr 14 05:02:55 2021 +0200 [FLINK-21996][coordination] Ensure exactly-once guarantees for OperatorEvent RPCs This consists of two changes that work together: - Delay checkpoints until we have clarity about all in-flight OperatorEvents - Fail target subtask if the result future for an OperatorEvent send fails --- .../operators/coordination/EventSender.java | 37 +++++++++++ .../coordination/ExecutionSubtaskAccess.java | 10 +++ .../coordination/OperatorCoordinatorHolder.java | 72 +++++++++++++++++++++- .../operators/coordination/OperatorEventValve.java | 7 ++- .../operators/coordination/SubtaskAccess.java | 11 ++++ .../operators/coordination/SubtaskGatewayImpl.java | 30 +++++++-- .../util/IncompleteFuturesTracker.java | 24 +++++--- .../coordination/EventReceivingTasks.java | 34 ++++++---- .../OperatorCoordinatorHolderTest.java | 18 ++++++ .../util/IncompleteFuturesTrackerTest.java | 18 +++++- .../OperatorEventSendingCheckpointITCase.java | 5 +- 11 files changed, 230 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java new file mode 100644 index 0000000..c70a21a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.messages.Acknowledge; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; + +/** Simple interface for a component that takes and sends events. */ +@FunctionalInterface +interface EventSender { + + /** + * Takes the given Callable and calls it at a certain point to send the event. The result of + * that Callable are bridged to the given result future. + */ + void sendEvent( + Callable<CompletableFuture<Acknowledge>> sendAction, + CompletableFuture<Acknowledge> result); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java index fc363de..9babeb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java @@ -81,6 +81,11 @@ final class ExecutionSubtaskAccess implements SubtaskAccess { } @Override + public String subtaskName() { + return taskExecution.getVertexWithAttempt(); + } + + @Override public CompletableFuture<?> hasSwitchedToRunning() { return taskExecution.getInitializingOrRunningFuture(); } @@ -91,6 +96,11 @@ final class ExecutionSubtaskAccess implements SubtaskAccess { || taskExecution.getState() == ExecutionState.INITIALIZING; } + @Override + public void triggerTaskFailover(Throwable cause) { + taskExecution.fail(cause); + } + // ------------------------------------------------------------------------ static final class ExecutionJobVertexSubtaskAccess implements SubtaskAccessFactory { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 8d96e15..a38ca16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; @@ -34,6 +36,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.Collection; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -112,11 +116,15 @@ import static org.apache.flink.util.Preconditions.checkState; public class OperatorCoordinatorHolder implements OperatorCoordinatorCheckpointContext, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinatorHolder.class); + private final OperatorCoordinator coordinator; private final OperatorID operatorId; private final LazyInitializedCoordinatorContext context; private final SubtaskAccess.SubtaskAccessFactory taskAccesses; private final OperatorEventValve eventValve; + private final IncompleteFuturesTracker unconfirmedEvents; + private final EventSender eventSender; private final int operatorParallelism; private final int operatorMaxParallelism; @@ -139,7 +147,9 @@ public class OperatorCoordinatorHolder this.operatorParallelism = operatorParallelism; this.operatorMaxParallelism = operatorMaxParallelism; + this.unconfirmedEvents = new IncompleteFuturesTracker(); this.eventValve = new OperatorEventValve(); + this.eventSender = new ValveAndTrackerSender(eventValve, unconfirmedEvents); } public void lazyInitialize( @@ -278,7 +288,7 @@ public class OperatorCoordinatorHolder if (failure != null) { result.completeExceptionally(failure); } else if (eventValve.tryShutValve(checkpointId)) { - result.complete(success); + completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { // if we cannot shut the valve, this means the checkpoint // has been aborted before, so the future is already @@ -299,6 +309,43 @@ public class OperatorCoordinatorHolder } } + private void completeCheckpointOnceEventsAreDone( + final long checkpointId, + final CompletableFuture<byte[]> checkpointFuture, + final byte[] checkpointResult) { + + final Collection<CompletableFuture<?>> pendingEvents = + unconfirmedEvents.getCurrentIncompleteAndReset(); + if (pendingEvents.isEmpty()) { + checkpointFuture.complete(checkpointResult); + return; + } + + LOG.info( + "Coordinator checkpoint {} for coordinator {} is awaiting {} pending events", + checkpointId, + operatorId, + pendingEvents.size()); + + final CompletableFuture<?> conjunct = FutureUtils.waitForAll(pendingEvents); + conjunct.whenComplete( + (success, failure) -> { + if (failure == null) { + checkpointFuture.complete(checkpointResult); + } else { + // if we reach this situation, then anyways the checkpoint cannot + // complete because + // (a) the target task really is down + // (b) we have a potentially lost RPC message and need to + // do a task failover for the receiver to restore consistency + checkpointFuture.completeExceptionally( + new FlinkException( + "Failing OperatorCoordinator checkpoint because some OperatorEvents " + + "before this checkpoint barrier were not received by the target tasks.")); + } + }); + } + // ------------------------------------------------------------------------ // Checkpointing Callbacks // ------------------------------------------------------------------------ @@ -336,7 +383,7 @@ public class OperatorCoordinatorHolder final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask); final OperatorCoordinator.SubtaskGateway gateway = - new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor); + new SubtaskGatewayImpl(sta, eventSender, mainThreadExecutor); // We need to do this synchronously here, otherwise we violate the contract that // 'subtaskFailed()' will never overtake 'subtaskReady()'. @@ -522,4 +569,25 @@ public class OperatorCoordinatorHolder return userCodeClassLoader; } } + + // ------------------------------------------------------------------------ + + private static final class ValveAndTrackerSender implements EventSender { + + private final OperatorEventValve valve; + private final IncompleteFuturesTracker tracker; + + ValveAndTrackerSender(OperatorEventValve valve, IncompleteFuturesTracker tracker) { + this.valve = valve; + this.tracker = tracker; + } + + @Override + public void sendEvent( + Callable<CompletableFuture<Acknowledge>> sendAction, + CompletableFuture<Acknowledge> result) { + valve.sendEvent(sendAction, result); + tracker.trackFutureWhileIncomplete(result); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java index 8e71e36..7086dab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java @@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture; * that, one can register a "main thread executor" (as used by the mailbox components like RPC * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}. */ -final class OperatorEventValve { +final class OperatorEventValve implements EventSender { private static final long NO_CHECKPOINT = Long.MIN_VALUE; @@ -55,7 +55,7 @@ final class OperatorEventValve { @Nullable private ComponentMainThreadExecutor mainThreadExecutor; /** Constructs a new OperatorEventValve. */ - public OperatorEventValve() { + OperatorEventValve() { this.currentCheckpointId = NO_CHECKPOINT; this.lastCheckpointId = Long.MIN_VALUE; } @@ -82,6 +82,7 @@ final class OperatorEventValve { * <p>This method makes no assumptions and gives no guarantees from which thread the result * future gets completed. */ + @Override public void sendEvent( Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> result) { @@ -172,7 +173,7 @@ final class OperatorEventValve { } } - private static void callSendAction( + private void callSendAction( Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> result) { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java index c654af8..333310c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java @@ -56,6 +56,12 @@ interface SubtaskAccess { ExecutionAttemptID currentAttempt(); /** + * Gets a descriptive name of the operator's subtask , including name, subtask-id, parallelism, + * and execution attempt. + */ + String subtaskName(); + + /** * The future returned here completes once the target subtask is in a running state. As running * state classify the states {@link ExecutionState#RUNNING} and {@link * ExecutionState#INITIALIZING}. @@ -68,6 +74,11 @@ interface SubtaskAccess { */ boolean isStillRunning(); + /** + * Triggers a failover for the subtaks execution attempt that this access instance is bound to. + */ + void triggerTaskFailover(Throwable cause); + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java index 0e8dade..11f33f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SerializedValue; @@ -34,14 +36,17 @@ import java.util.concurrent.Executor; */ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { + private static final String EVENT_LOSS_ERROR_MESSAGE = + "An OperatorEvent from an OperatorCoordinator to a task was lost. " + + "Triggering task failover to ensure consistency. Event: '%s', targetTask: %s"; + private final SubtaskAccess subtaskAccess; - private final OperatorEventValve valve; + private final EventSender sender; private final Executor sendingExecutor; - SubtaskGatewayImpl( - SubtaskAccess subtaskAccess, OperatorEventValve valve, Executor sendingExecutor) { + SubtaskGatewayImpl(SubtaskAccess subtaskAccess, EventSender sender, Executor sendingExecutor) { this.subtaskAccess = subtaskAccess; - this.valve = valve; + this.sender = sender; this.sendingExecutor = sendingExecutor; } @@ -64,7 +69,22 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { subtaskAccess.createEventSendAction(serializedEvent); final CompletableFuture<Acknowledge> result = new CompletableFuture<>(); - sendingExecutor.execute(() -> valve.sendEvent(sendAction, result)); + FutureUtils.assertNoException( + result.handleAsync( + (success, failure) -> { + if (failure != null && subtaskAccess.isStillRunning()) { + String msg = + String.format( + EVENT_LOSS_ERROR_MESSAGE, + evt, + subtaskAccess.subtaskName()); + subtaskAccess.triggerTaskFailover(new FlinkException(msg)); + } + return null; + }, + sendingExecutor)); + + sendingExecutor.execute(() -> sender.sendEvent(sendAction, result)); return result; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java index 9904ce0..1f8a9cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java @@ -18,15 +18,12 @@ package org.apache.flink.runtime.operators.coordination.util; -import org.apache.flink.annotation.VisibleForTesting; - import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; @@ -64,6 +61,22 @@ public final class IncompleteFuturesTracker { future.whenComplete((success, failure) -> removeFromSet(future)); } + public Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() { + lock.lock(); + try { + if (incompleteFutures.isEmpty()) { + return Collections.emptySet(); + } + + final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(incompleteFutures); + incompleteFutures.clear(); + return futures; + + } finally { + lock.unlock(); + } + } + public void failAllFutures(Throwable cause) { final Collection<CompletableFuture<?>> futuresToFail; @@ -95,9 +108,4 @@ public final class IncompleteFuturesTracker { lock.unlock(); } } - - @VisibleForTesting - Set<CompletableFuture<?>> getTrackedFutures() { - return Collections.unmodifiableSet(incompleteFutures); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java index baee4b1..dfed1eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java @@ -24,8 +24,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.util.SerializedValue; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -43,22 +41,27 @@ import java.util.stream.Collectors; public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory { public static EventReceivingTasks createForNotYetRunningTasks() { - return new EventReceivingTasks(false, null); + return new EventReceivingTasks(false, CompletableFuture.completedFuture(Acknowledge.get())); } public static EventReceivingTasks createForRunningTasks() { - return new EventReceivingTasks(true, null); + return new EventReceivingTasks(true, CompletableFuture.completedFuture(Acknowledge.get())); } public static EventReceivingTasks createForRunningTasksFailingRpcs(Throwable rpcException) { - return new EventReceivingTasks(true, rpcException); + return new EventReceivingTasks(true, FutureUtils.completedExceptionally(rpcException)); + } + + public static EventReceivingTasks createForRunningTasksWithRpcResult( + CompletableFuture<Acknowledge> result) { + return new EventReceivingTasks(true, result); } // ------------------------------------------------------------------------ final ArrayList<EventWithSubtask> events = new ArrayList<>(); - @Nullable private final Throwable eventSendingFailureCause; + private final CompletableFuture<Acknowledge> eventSendingResult; private final Map<Integer, TestSubtaskAccess> subtasks = new HashMap<>(); @@ -66,9 +69,9 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory { private EventReceivingTasks( final boolean createdTasksAreRunning, - @Nullable final Throwable eventSendingFailureCause) { + final CompletableFuture<Acknowledge> eventSendingResult) { this.createdTasksAreRunning = createdTasksAreRunning; - this.eventSendingFailureCause = eventSendingFailureCause; + this.eventSendingResult = eventSendingResult; } // ------------------------------------------------------------------------ @@ -123,10 +126,7 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory { Callable<CompletableFuture<Acknowledge>> createSendAction(OperatorEvent event, int subtask) { return () -> { events.add(new EventWithSubtask(event, subtask)); - - return eventSendingFailureCause == null - ? CompletableFuture.completedFuture(Acknowledge.get()) - : FutureUtils.completedExceptionally(eventSendingFailureCause); + return eventSendingResult; }; } @@ -207,6 +207,11 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory { } @Override + public String subtaskName() { + return "test_task-" + subtaskIndex + " #: " + executionAttemptId; + } + + @Override public CompletableFuture<?> hasSwitchedToRunning() { return running; } @@ -219,5 +224,10 @@ public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory { void switchToRunning() { running.complete(null); } + + @Override + public void triggerTaskFailover(Throwable cause) { + // ignore this in the tests + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 4eacd16..3d3cddc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -288,6 +289,23 @@ public class OperatorCoordinatorHolderTest extends TestLogger { globalFailure = null; } + @Test + public void checkpointCompletionWaitsForEventFutures() throws Exception { + final CompletableFuture<Acknowledge> ackFuture = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(ackFuture); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + final CompletableFuture<?> checkpointFuture = triggerAndCompleteCheckpoint(holder, 22L); + assertFalse(checkpointFuture.isDone()); + + ackFuture.complete(Acknowledge.get()); + assertTrue(checkpointFuture.isDone()); + } + /** * This test verifies that the order of Checkpoint Completion and Event Sending observed from * the outside matches that from within the OperatorCoordinator. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java index e994288..a2aff7f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; @@ -40,7 +41,7 @@ public class IncompleteFuturesTrackerTest { tracker.trackFutureWhileIncomplete(future); - assertThat(tracker.getTrackedFutures(), contains(future)); + assertThat(tracker.getCurrentIncompleteAndReset(), contains(future)); } @Test @@ -51,7 +52,7 @@ public class IncompleteFuturesTrackerTest { tracker.trackFutureWhileIncomplete(future); future.complete(null); - assertThat(tracker.getTrackedFutures(), not(contains(future))); + assertThat(tracker.getCurrentIncompleteAndReset(), not(contains(future))); } @Test @@ -62,7 +63,7 @@ public class IncompleteFuturesTrackerTest { future.complete(null); tracker.trackFutureWhileIncomplete(future); - assertThat(tracker.getTrackedFutures(), not(contains(future))); + assertThat(tracker.getCurrentIncompleteAndReset(), not(contains(future))); } @Test @@ -102,4 +103,15 @@ public class IncompleteFuturesTrackerTest { assertSame(expectedException, e.getCause()); } } + + @Test + public void testResetClearsTrackedFutures() { + final IncompleteFuturesTracker tracker = new IncompleteFuturesTracker(); + + final CompletableFuture<?> future = new CompletableFuture<>(); + tracker.trackFutureWhileIncomplete(future); + tracker.getCurrentIncompleteAndReset(); + + assertThat(tracker.getCurrentIncompleteAndReset(), empty()); + } } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index bd59ab0..9ca560c 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -53,7 +53,6 @@ import org.apache.flink.util.function.TriFunction; import akka.actor.ActorSystem; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import javax.annotation.Nullable; @@ -107,7 +106,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { * event was lost and trigger recovery to prevent data loss. Data loss would manifest in a * stalled test, because we could wait forever to collect the required number of events back. */ - @Ignore // ignore for now, because this test fails due to FLINK-21996 @Test public void testOperatorEventLostNoReaderFailure() throws Exception { final int[] eventsToLose = new int[] {2, 4, 6}; @@ -125,7 +123,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { * (which is after the second successful event delivery, the fourth event), there is * additionally a failure on the reader that triggers recovery. */ - @Ignore // ignore for now, because this test fails due to FLINK-21996 @Test public void testOperatorEventLostWithReaderFailure() throws Exception { final int[] eventsToLose = new int[] {1, 3}; @@ -224,6 +221,8 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { }); final List<Long> sequence = numbers.executeAndCollect(numElements); + // the recovery may change the order of splits, so the sequence might be out-of-order + sequence.sort(Long::compareTo); final List<Long> expectedSequence = LongStream.rangeClosed(1L, numElements).boxed().collect(Collectors.toList());