This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37f7db38290df065669178cc6407edd5055f1951 Author: Stephan Ewen <se...@apache.org> AuthorDate: Thu May 28 18:19:30 2020 +0200 [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor --- .../coordination/OperatorCoordinatorHolder.java | 132 +++-- .../operators/coordination/OperatorEventValve.java | 62 ++- .../flink/runtime/scheduler/SchedulerBase.java | 2 +- .../OperatorCoordinatorHolderTest.java | 556 +++++++++++++++++++++ .../coordination/OperatorEventValveTest.java | 46 +- 5 files changed, 725 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 304eb20..9e68c7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -20,11 +20,13 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SerializedValue; @@ -57,19 +59,16 @@ import static org.apache.flink.util.Preconditions.checkState; */ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext { - private static final long NO_CHECKPOINT = Long.MIN_VALUE; - private final OperatorCoordinator coordinator; private final OperatorID operatorId; private final LazyInitializedCoordinatorContext context; - private final OperatorEventValve eventValve; - // these two fields are needed for the construction of OperatorStateHandles when taking checkpoints private final int operatorParallelism; private final int operatorMaxParallelism; - private volatile long currentlyTriggeredCheckpoint; + private Consumer<Throwable> globalFailureHandler; + private ComponentMainThreadExecutor mainThreadExecutor; private OperatorCoordinatorHolder( final OperatorID operatorId, @@ -85,17 +84,17 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC this.eventValve = checkNotNull(eventValve); this.operatorParallelism = operatorParallelism; this.operatorMaxParallelism = operatorMaxParallelism; - - this.currentlyTriggeredCheckpoint = NO_CHECKPOINT; } - public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) { - lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor); + public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) { + lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor); } @VisibleForTesting - void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) { - context.lazyInitialize(globalFailureHandler, schedulerExecutor); + void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) { + this.globalFailureHandler = globalFailureHandler; + this.mainThreadExecutor = mainThreadExecutor; + context.lazyInitialize(globalFailureHandler, mainThreadExecutor); } // ------------------------------------------------------------------------ @@ -127,6 +126,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC @Override public void start() throws Exception { + mainThreadExecutor.assertRunningInMainThread(); checkState(context.isInitialized(), "Coordinator Context is not yet initialized"); coordinator.start(); } @@ -139,78 +139,112 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC @Override public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception { + mainThreadExecutor.assertRunningInMainThread(); coordinator.handleEventFromOperator(subtask, event); } @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { + mainThreadExecutor.assertRunningInMainThread(); coordinator.subtaskFailed(subtask, reason); eventValve.resetForTask(subtask); } @Override - public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception { - setCurrentlyTriggeredCheckpoint(checkpointId); - - final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId); - - // synchronously!!!, with the completion, we need to shut the event valve - checkpointFuture.whenComplete((ignored, failure) -> { - if (failure != null) { - abortCurrentTriggering(); - } else { - onCheckpointStateFutureComplete(checkpointId); - } - }); - - return checkpointFuture; + public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) { + // unfortunately, this method does not run in the scheduler executor, but in the + // checkpoint coordinator time thread. + // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's + // main thread executor + final CompletableFuture<byte[]> future = new CompletableFuture<>(); + mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future)); + return future; } @Override public void checkpointComplete(long checkpointId) { - coordinator.checkpointComplete(checkpointId); + // unfortunately, this method does not run in the scheduler executor, but in the + // checkpoint coordinator time thread. + // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's + // main thread executor + mainThreadExecutor.execute(() -> checkpointCompleteInternal(checkpointId)); } @Override public void resetToCheckpoint(byte[] checkpointData) throws Exception { - resetCheckpointTriggeringCheck(); + // ideally we would like to check this here, however this method is called early during + // execution graph construction, before the main thread executor is set + eventValve.reset(); coordinator.resetToCheckpoint(checkpointData); } + private void checkpointCompleteInternal(long checkpointId) { + mainThreadExecutor.assertRunningInMainThread(); + coordinator.checkpointComplete(checkpointId); + } + + private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) { + mainThreadExecutor.assertRunningInMainThread(); + + final CompletableFuture<byte[]> checkpointFuture; + try { + eventValve.markForCheckpoint(checkpointId); + checkpointFuture = coordinator.checkpointCoordinator(checkpointId); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.accept(t); + return; + } + + // synchronously!!!, with the completion, we need to shut the event valve + checkpointFuture.whenComplete((success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + eventValve.shutValve(checkpointId); + result.complete(success); + } catch (Exception e) { + result.completeExceptionally(e); + } + } + }); + } + // ------------------------------------------------------------------------ // Checkpointing Callbacks // ------------------------------------------------------------------------ @Override public void afterSourceBarrierInjection(long checkpointId) { - verifyNoOtherCheckpointBeingTriggered(checkpointId); - eventValve.openValve(); - resetCheckpointTriggeringCheck(); - } + // this method is commonly called by the CheckpointCoordinator's executor thread (timer thread). - @Override - public void abortCurrentTriggering() { - eventValve.openValve(); - resetCheckpointTriggeringCheck(); - } + // we ideally want the scheduler main-thread to be the one that sends the blocked events + // however, we need to react synchronously here, to maintain consistency and not allow + // another checkpoint injection in-between (unlikely, but possible). + // fortunately, the event-sending goes pretty much directly to the RPC gateways, which are + // thread safe. - void onCheckpointStateFutureComplete(long checkpointId) { - verifyNoOtherCheckpointBeingTriggered(checkpointId); - eventValve.shutValve(); + // this will automatically be fixed once the checkpoint coordinator runs in the + // scheduler's main thread executor + eventValve.openValveAndUnmarkCheckpoint(); } - private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) { - checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId); - } + @Override + public void abortCurrentTriggering() { + // this method is commonly called by the CheckpointCoordinator's executor thread (timer thread). - private void setCurrentlyTriggeredCheckpoint(long checkpointId) { - verifyNoOtherCheckpointBeingTriggered(checkpointId); - currentlyTriggeredCheckpoint = checkpointId; - } + // we ideally want the scheduler main-thread to be the one that sends the blocked events + // however, we need to react synchronously here, to maintain consistency and not allow + // another checkpoint injection in-between (unlikely, but possible). + // fortunately, the event-sending goes pretty much directly to the RPC gateways, which are + // thread safe. - private void resetCheckpointTriggeringCheck() { - currentlyTriggeredCheckpoint = NO_CHECKPOINT; + // this will automatically be fixed once the checkpoint coordinator runs in the + // scheduler's main thread executor + eventValve.openValveAndUnmarkCheckpoint(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java index bc1bfcf..f63df4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; @@ -41,7 +42,9 @@ import java.util.function.BiFunction; * * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe. */ -public final class OperatorEventValve { +final class OperatorEventValve { + + private static final long NO_CHECKPOINT = Long.MIN_VALUE; private final Object lock = new Object(); @@ -52,6 +55,12 @@ public final class OperatorEventValve { private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>(); @GuardedBy("lock") + private long currentCheckpointId; + + @GuardedBy("lock") + private long lastCheckpointId; + + @GuardedBy("lock") private boolean shut; /** @@ -60,6 +69,8 @@ public final class OperatorEventValve { */ public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) { this.eventSender = eventSender; + this.currentCheckpointId = NO_CHECKPOINT; + this.lastCheckpointId = Long.MIN_VALUE; } // ------------------------------------------------------------------------ @@ -96,21 +107,52 @@ public final class OperatorEventValve { * Shuts the value. All events sent through this valve are blocked until the valve is re-opened. * If the valve is already shut, this does nothing. */ - public void shutValve() { - // synchronized block for visibility + public void markForCheckpoint(long checkpointId) { synchronized (lock) { - shut = true; + if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) { + throw new IllegalStateException(String.format( + "Cannot mark for checkpoint %d, already marked for checkpoint %d", + checkpointId, currentCheckpointId)); + } + if (checkpointId > lastCheckpointId) { + currentCheckpointId = checkpointId; + lastCheckpointId = checkpointId; + } else { + throw new IllegalStateException(String.format( + "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", + lastCheckpointId, checkpointId)); + } + } + } + + /** + * Shuts the value. All events sent through this valve are blocked until the valve is re-opened. + * If the valve is already shut, this does nothing. + */ + public void shutValve(long checkpointId) { + synchronized (lock) { + if (checkpointId == currentCheckpointId) { + shut = true; + } else { + throw new IllegalStateException(String.format( + "Cannot shut valve for non-prepared checkpoint. " + + "Prepared checkpoint = %s, attempting-to-close checkpoint = %d", + (currentCheckpointId == NO_CHECKPOINT ? "(none)" : String.valueOf(currentCheckpointId)), + checkpointId)); + } } } /** * Opens the value, releasing all buffered events. */ - public void openValve() { + public void openValveAndUnmarkCheckpoint() { final ArrayList<FuturePair> futures; // send all events under lock, so that no new event can sneak between synchronized (lock) { + currentCheckpointId = NO_CHECKPOINT; + if (!shut) { return; } @@ -129,14 +171,7 @@ public final class OperatorEventValve { // apply the logic on the future outside the lock, to be safe for (FuturePair pair : futures) { - final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture; - pair.ackFuture.whenComplete((success, failure) -> { - if (failure != null) { - originalFuture.completeExceptionally(failure); - } else { - originalFuture.complete(success); - } - }); + FutureUtils.forward(pair.ackFuture, pair.originalFuture); } } @@ -165,6 +200,7 @@ public final class OperatorEventValve { } blockedEvents.clear(); shut = false; + currentCheckpointId = NO_CHECKPOINT; } failAllFutures(events); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index ce1d8a1..1a10d6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -970,7 +970,7 @@ public abstract class SchedulerBase implements SchedulerNG { } } - private void initializeOperatorCoordinators(Executor mainThreadExecutor) { + private void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) { for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) { coordinatorHolder.lazyInitialize(this, mainThreadExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java new file mode 100644 index 0000000..fa4e7cf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Test; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * A test that ensures the before/after conditions around event sending and checkpoint are met. + * concurrency + */ +@SuppressWarnings("serial") +public class OperatorCoordinatorHolderTest extends TestLogger { + + private final Consumer<Throwable> globalFailureHandler = (t) -> globalFailure = t; + private Throwable globalFailure; + + @After + public void checkNoGlobalFailure() throws Exception { + if (globalFailure != null) { + ExceptionUtils.rethrowException(globalFailure); + } + } + + // ------------------------------------------------------------------------ + + @Test + public void checkpointFutureInitiallyNotDone() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L); + assertFalse(checkpointFuture.isDone()); + } + + @Test + public void completedCheckpointFuture() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + final byte[] testData = new byte[] {11, 22, 33, 44}; + + final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData); + + assertTrue(checkpointFuture.isDone()); + assertArrayEquals(testData, checkpointFuture.get()); + } + + @Test + public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + holder.checkpointCoordinator(1L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(1), 1) + )); + } + + @Test + public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 10L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0); + + assertTrue(sender.events.isEmpty()); + } + + @Test + public void abortedCheckpointReleasesBlockedEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 123L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0); + holder.abortCurrentTriggering(); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(1337), 0) + )); + } + + @Test + public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 1111L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0); + holder.afterSourceBarrierInjection(1111L); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(1337), 0) + )); + } + + @Test + public void failedTasksDropsBlockedEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 1000L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1); + holder.subtaskFailed(1, null); + holder.abortCurrentTriggering(); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(0), 0) + )); + } + + @Test + public void restoreOpensValveEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 1000L); + holder.resetToCheckpoint(new byte[0]); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(999), 1); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(999), 1) + )); + } + + @Test + public void restoreDropsBlockedEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + triggerAndCompleteCheckpoint(holder, 1000L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1); + holder.resetToCheckpoint(new byte[0]); + + assertTrue(sender.events.isEmpty()); + } + + @Test + public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L); + final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint(); + holder.abortCurrentTriggering(); + + triggerAndCompleteCheckpoint(holder, 1010L); + holder.afterSourceBarrierInjection(1010L); + + future1.complete(new byte[0]); + + assertTrue(holderFuture.isCompletedExceptionally()); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(123), 0) + )); + } + + @Test + public void triggeringFailsIfOtherTriggeringInProgress() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + holder.checkpointCoordinator(11L); + final CompletableFuture<?> future = holder.checkpointCoordinator(12L); + + assertTrue(future.isCompletedExceptionally()); + assertNotNull(globalFailure); + globalFailure = null; + } + + @Test + public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0); + + triggerAndCompleteCheckpoint(holder, 22L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0); + holder.afterSourceBarrierInjection(22L); + + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0); + + triggerAndCompleteCheckpoint(holder, 23L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0); + holder.afterSourceBarrierInjection(23L); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(0), 0), + new EventWithSubtask(new TestOperatorEvent(1), 0), + new EventWithSubtask(new TestOperatorEvent(2), 0), + new EventWithSubtask(new TestOperatorEvent(3), 0) + )); + } + + @Test + public void takeCheckpointAfterAbortedCheckpoint() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); + + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0); + + triggerAndCompleteCheckpoint(holder, 22L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0); + holder.abortCurrentTriggering(); + + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0); + + triggerAndCompleteCheckpoint(holder, 23L); + getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0); + holder.afterSourceBarrierInjection(23L); + + assertThat(sender.events, contains( + new EventWithSubtask(new TestOperatorEvent(0), 0), + new EventWithSubtask(new TestOperatorEvent(1), 0), + new EventWithSubtask(new TestOperatorEvent(2), 0), + new EventWithSubtask(new TestOperatorEvent(3), 0) + )); + } + + /** + * This test verifies that the order of Checkpoint Completion and Event Sending observed from the + * outside matches that from within the OperatorCoordinator. + * + * <p>Extreme case 1: The coordinator immediately completes the checkpoint future and sends an + * event directly after that. + */ + @Test + public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception { + checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new); + } + + /** + * This test verifies that the order of Checkpoint Completion and Event Sending observed from the + * outside matches that from within the OperatorCoordinator. + * + * <p>Extreme case 2: After the checkpoint triggering, the coordinator flushes a bunch of events + * before completing the checkpoint future. + */ + @Test + public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception { + checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new); + } + + private void checkpointEventValueAtomicity( + final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception { + + final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); + final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter( + (ScheduledExecutorService) executor, Thread.currentThread()); + + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder( + sender, coordinatorCtor, mainThreadExecutor); + + // give the coordinator some time to emit some events + Thread.sleep(new Random().nextInt(10) + 20); + executor.triggerAll(); + + // trigger the checkpoint - this should also shut the valve as soon as the future is completed + final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L); + executor.triggerAll(); + + // give the coordinator some time to emit some events + Thread.sleep(new Random().nextInt(10) + 10); + holder.close(); + executor.triggerAll(); + + assertTrue(checkpointFuture.isDone()); + final int checkpointedNumber = bytesToInt(checkpointFuture.get()); + + assertEquals(checkpointedNumber, sender.events.size()); + for (int i = 0; i < checkpointedNumber; i++) { + assertEquals(i, ((TestOperatorEvent) sender.events.get(i).event).getValue()); + } + } + + // ------------------------------------------------------------------------ + // test actions + // ------------------------------------------------------------------------ + + private CompletableFuture<byte[]> triggerAndCompleteCheckpoint( + OperatorCoordinatorHolder holder, + long checkpointId) throws Exception { + + final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + return future; + } + + // ------------------------------------------------------------------------ + // miscellaneous helpers + // ------------------------------------------------------------------------ + + static byte[] intToBytes(int value) { + return ByteBuffer.allocate(4).putInt(value).array(); + } + + static int bytesToInt(byte[] bytes) { + return ByteBuffer.wrap(bytes).getInt(); + } + + private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) { + return (TestingOperatorCoordinator) holder.coordinator(); + } + + private OperatorCoordinatorHolder createCoordinatorHolder( + final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, + final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception { + + return createCoordinatorHolder( + eventSender, + coordinatorCtor, + ComponentMainThreadExecutorServiceAdapter.forMainThread()); + } + + private OperatorCoordinatorHolder createCoordinatorHolder( + final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, + final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor, + final ComponentMainThreadExecutor mainThreadExecutor) throws Exception { + + final OperatorID opId = new OperatorID(); + final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() { + @Override + public OperatorID getOperatorId() { + return opId; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return coordinatorCtor.apply(context); + } + }; + + final OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create( + opId, + provider, + eventSender, + "test-coordinator-name", + 3, + 1775); + + holder.lazyInitialize(globalFailureHandler, mainThreadExecutor); + holder.start(); + + return holder; + } + + // ------------------------------------------------------------------------ + // test implementations + // ------------------------------------------------------------------------ + + private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator { + + private final ReentrantLock lock = new ReentrantLock(true); + private final Condition condition = lock.newCondition(); + + @Nullable + @GuardedBy("lock") + private CompletableFuture<byte[]> checkpoint; + + private int num; + + FutureCompletedInstantlyTestCoordinator(Context context) { + super(context); + } + + @Override + public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception { + // we create the checkpoint future, but before returning it, we wait on a + // condition. that way, we simulate a "context switch" just at the time when the + // future would be returned and make the other thread complete the future and send an + // event before this method returns + final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); + lock.lock(); + try { + checkpoint = checkpointFuture; + condition.await(); + return checkpointFuture; + } finally { + lock.unlock(); + } + } + + @Override + protected void step() throws Exception { + lock.lock(); + try { + // if there is a checkpoint to complete, we complete it and immediately + // try to send another event, without releasing the lock. that way we + // force the situation as if the checkpoint get completed and an event gets + // sent while the triggering thread is stalled + if (checkpoint != null) { + checkpoint.complete(intToBytes(num)); + checkpoint = null; + } + context.sendEvent(new TestOperatorEvent(num++), 0); + condition.signalAll(); + } finally { + lock.unlock(); + } + + Thread.sleep(2); + } + } + + private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator { + + @Nullable + private CompletableFuture<byte[]> checkpoint; + + private int num; + + FutureCompletedAfterSendingEventsCoordinator(Context context) { + super(context); + } + + @Override + public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception { + final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); + checkpoint = checkpointFuture; + return checkpointFuture; + } + + @Override + protected void step() throws Exception { + Thread.sleep(2); + + context.sendEvent(new TestOperatorEvent(num++), 0); + context.sendEvent(new TestOperatorEvent(num++), 1); + context.sendEvent(new TestOperatorEvent(num++), 2); + + if (checkpoint != null) { + checkpoint.complete(intToBytes(num)); + checkpoint = null; + } + } + } + + private abstract static class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable { + + private final Thread coordinatorThread; + + protected final Context context; + + private volatile boolean closed; + + CheckpointEventOrderTestBaseCoordinator(Context context) { + this.context = context; + this.coordinatorThread = new Thread(this); + } + + @Override + public void start() throws Exception { + coordinatorThread.start(); + } + + @Override + public void close() throws Exception { + closed = true; + coordinatorThread.interrupt(); + coordinatorThread.join(); + } + + @Override + public void handleEventFromOperator(int subtask, OperatorEvent event){} + + @Override + public void subtaskFailed(int subtask, @Nullable Throwable reason) {} + + @Override + public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception; + + @Override + public void checkpointComplete(long checkpointId) {} + + @Override + public void resetToCheckpoint(byte[] checkpointData) throws Exception {} + + @Override + public void run() { + try { + while (!closed) { + step(); + } + } catch (Throwable t) { + if (closed) { + return; + } + + // this should never happen, but just in case, print and crash the test + //noinspection CallToPrintStackTrace + t.printStackTrace(); + System.exit(-1); + } + } + + protected abstract void step() throws Exception; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java index 9b0325d..a380958 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java @@ -50,11 +50,30 @@ public class OperatorEventValveTest { assertTrue(future.isDone()); } + @Test(expected = IllegalStateException.class) + public void errorShuttingUnmarkedValve() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorEventValve valve = new OperatorEventValve(sender); + + valve.shutValve(123L); + } + + @Test(expected = IllegalStateException.class) + public void errorShuttingValveForOtherMark() throws Exception { + final TestEventSender sender = new TestEventSender(); + final OperatorEventValve valve = new OperatorEventValve(sender); + + valve.markForCheckpoint(100L); + valve.shutValve(123L); + } + @Test public void eventsBlockedByClosedValve() throws Exception { final TestEventSender sender = new TestEventSender(); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + + valve.markForCheckpoint(1L); + valve.shutValve(1L); final CompletableFuture<Acknowledge> future = valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1); @@ -67,14 +86,16 @@ public class OperatorEventValveTest { public void eventsReleasedAfterOpeningValve() throws Exception { final TestEventSender sender = new TestEventSender(); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + + valve.markForCheckpoint(17L); + valve.shutValve(17L); final OperatorEvent event1 = new TestOperatorEvent(); final OperatorEvent event2 = new TestOperatorEvent(); final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3); final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0); - valve.openValve(); + valve.openValveAndUnmarkCheckpoint(); assertThat(sender.events, containsInAnyOrder( new EventWithSubtask(event1, 3), @@ -88,11 +109,13 @@ public class OperatorEventValveTest { public void releasedEventsForwardSendFailures() throws Exception { final TestEventSender sender = new TestEventSender(new FlinkException("test")); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + + valve.markForCheckpoint(17L); + valve.shutValve(17L); final CompletableFuture<Acknowledge> future = valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10); - valve.openValve(); + valve.openValveAndUnmarkCheckpoint(); assertTrue(future.isCompletedExceptionally()); } @@ -101,13 +124,14 @@ public class OperatorEventValveTest { public void resetDropsAllEvents() throws Exception { final TestEventSender sender = new TestEventSender(); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + valve.markForCheckpoint(17L); + valve.shutValve(17L); valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0); valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1); valve.reset(); - valve.openValve(); + valve.openValveAndUnmarkCheckpoint(); assertTrue(sender.events.isEmpty()); } @@ -116,7 +140,8 @@ public class OperatorEventValveTest { public void resetForTaskDropsSelectiveEvents() throws Exception { final TestEventSender sender = new TestEventSender(); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + valve.markForCheckpoint(17L); + valve.shutValve(17L); final OperatorEvent event1 = new TestOperatorEvent(); final OperatorEvent event2 = new TestOperatorEvent(); @@ -124,7 +149,7 @@ public class OperatorEventValveTest { final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1); valve.resetForTask(1); - valve.openValve(); + valve.openValveAndUnmarkCheckpoint(); assertThat(sender.events, contains(new EventWithSubtask(event1, 0))); assertTrue(future1.isDone()); @@ -136,7 +161,8 @@ public class OperatorEventValveTest { final TestEventSender sender = new TestEventSender(); final OperatorEventValve valve = new OperatorEventValve(sender); - valve.shutValve(); + valve.markForCheckpoint(17L); + valve.shutValve(17L); valve.reset(); assertFalse(valve.isShut());