This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37f7db38290df065669178cc6407edd5055f1951
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:19:30 2020 +0200

    [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven 
by main thread executor
---
 .../coordination/OperatorCoordinatorHolder.java    | 132 +++--
 .../operators/coordination/OperatorEventValve.java |  62 ++-
 .../flink/runtime/scheduler/SchedulerBase.java     |   2 +-
 .../OperatorCoordinatorHolderTest.java             | 556 +++++++++++++++++++++
 .../coordination/OperatorEventValveTest.java       |  46 +-
 5 files changed, 725 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 304eb20..9e68c7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
@@ -57,19 +59,16 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class OperatorCoordinatorHolder implements OperatorCoordinator, 
OperatorCoordinatorCheckpointContext {
 
-       private static final long NO_CHECKPOINT = Long.MIN_VALUE;
-
        private final OperatorCoordinator coordinator;
        private final OperatorID operatorId;
        private final LazyInitializedCoordinatorContext context;
-
        private final OperatorEventValve eventValve;
 
-       // these two fields are needed for the construction of 
OperatorStateHandles when taking checkpoints
        private final int operatorParallelism;
        private final int operatorMaxParallelism;
 
-       private volatile long currentlyTriggeredCheckpoint;
+       private Consumer<Throwable> globalFailureHandler;
+       private ComponentMainThreadExecutor mainThreadExecutor;
 
        private OperatorCoordinatorHolder(
                        final OperatorID operatorId,
@@ -85,17 +84,17 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                this.eventValve = checkNotNull(eventValve);
                this.operatorParallelism = operatorParallelism;
                this.operatorMaxParallelism = operatorMaxParallelism;
-
-               this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
        }
 
-       public void lazyInitialize(SchedulerNG scheduler, Executor 
schedulerExecutor) {
-               lazyInitialize(scheduler::handleGlobalFailure, 
schedulerExecutor);
+       public void lazyInitialize(SchedulerNG scheduler, 
ComponentMainThreadExecutor mainThreadExecutor) {
+               lazyInitialize(scheduler::handleGlobalFailure, 
mainThreadExecutor);
        }
 
        @VisibleForTesting
-       void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor 
schedulerExecutor) {
-               context.lazyInitialize(globalFailureHandler, schedulerExecutor);
+       void lazyInitialize(Consumer<Throwable> globalFailureHandler, 
ComponentMainThreadExecutor mainThreadExecutor) {
+               this.globalFailureHandler = globalFailureHandler;
+               this.mainThreadExecutor = mainThreadExecutor;
+               context.lazyInitialize(globalFailureHandler, 
mainThreadExecutor);
        }
 
        // 
------------------------------------------------------------------------
@@ -127,6 +126,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
 
        @Override
        public void start() throws Exception {
+               mainThreadExecutor.assertRunningInMainThread();
                checkState(context.isInitialized(), "Coordinator Context is not 
yet initialized");
                coordinator.start();
        }
@@ -139,78 +139,112 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
 
        @Override
        public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
+               mainThreadExecutor.assertRunningInMainThread();
                coordinator.handleEventFromOperator(subtask, event);
        }
 
        @Override
        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+               mainThreadExecutor.assertRunningInMainThread();
                coordinator.subtaskFailed(subtask, reason);
                eventValve.resetForTask(subtask);
        }
 
        @Override
-       public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
-               setCurrentlyTriggeredCheckpoint(checkpointId);
-
-               final CompletableFuture<byte[]> checkpointFuture = 
coordinator.checkpointCoordinator(checkpointId);
-
-               // synchronously!!!, with the completion, we need to shut the 
event valve
-               checkpointFuture.whenComplete((ignored, failure) -> {
-                       if (failure != null) {
-                               abortCurrentTriggering();
-                       } else {
-                               onCheckpointStateFutureComplete(checkpointId);
-                       }
-               });
-
-               return checkpointFuture;
+       public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) {
+               // unfortunately, this method does not run in the scheduler 
executor, but in the
+               // checkpoint coordinator time thread.
+               // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+               // main thread executor
+               final CompletableFuture<byte[]> future = new 
CompletableFuture<>();
+               mainThreadExecutor.execute(() -> 
checkpointCoordinatorInternal(checkpointId, future));
+               return future;
        }
 
        @Override
        public void checkpointComplete(long checkpointId) {
-               coordinator.checkpointComplete(checkpointId);
+               // unfortunately, this method does not run in the scheduler 
executor, but in the
+               // checkpoint coordinator time thread.
+               // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+               // main thread executor
+               mainThreadExecutor.execute(() -> 
checkpointCompleteInternal(checkpointId));
        }
 
        @Override
        public void resetToCheckpoint(byte[] checkpointData) throws Exception {
-               resetCheckpointTriggeringCheck();
+               // ideally we would like to check this here, however this 
method is called early during
+               // execution graph construction, before the main thread 
executor is set
+
                eventValve.reset();
                coordinator.resetToCheckpoint(checkpointData);
        }
 
+       private void checkpointCompleteInternal(long checkpointId) {
+               mainThreadExecutor.assertRunningInMainThread();
+               coordinator.checkpointComplete(checkpointId);
+       }
+
+       private void checkpointCoordinatorInternal(final long checkpointId, 
final CompletableFuture<byte[]> result) {
+               mainThreadExecutor.assertRunningInMainThread();
+
+               final CompletableFuture<byte[]> checkpointFuture;
+               try {
+                       eventValve.markForCheckpoint(checkpointId);
+                       checkpointFuture = 
coordinator.checkpointCoordinator(checkpointId);
+               } catch (Throwable t) {
+                       ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                       result.completeExceptionally(t);
+                       globalFailureHandler.accept(t);
+                       return;
+               }
+
+               // synchronously!!!, with the completion, we need to shut the 
event valve
+               checkpointFuture.whenComplete((success, failure) -> {
+                       if (failure != null) {
+                               result.completeExceptionally(failure);
+                       } else {
+                               try {
+                                       eventValve.shutValve(checkpointId);
+                                       result.complete(success);
+                               } catch (Exception e) {
+                                       result.completeExceptionally(e);
+                               }
+                       }
+               });
+       }
+
        // 
------------------------------------------------------------------------
        //  Checkpointing Callbacks
        // 
------------------------------------------------------------------------
 
        @Override
        public void afterSourceBarrierInjection(long checkpointId) {
-               verifyNoOtherCheckpointBeingTriggered(checkpointId);
-               eventValve.openValve();
-               resetCheckpointTriggeringCheck();
-       }
+               // this method is commonly called by the 
CheckpointCoordinator's executor thread (timer thread).
 
-       @Override
-       public void abortCurrentTriggering() {
-               eventValve.openValve();
-               resetCheckpointTriggeringCheck();
-       }
+               // we ideally want the scheduler main-thread to be the one that 
sends the blocked events
+               // however, we need to react synchronously here, to maintain 
consistency and not allow
+               // another checkpoint injection in-between (unlikely, but 
possible).
+               // fortunately, the event-sending goes pretty much directly to 
the RPC gateways, which are
+               // thread safe.
 
-       void onCheckpointStateFutureComplete(long checkpointId) {
-               verifyNoOtherCheckpointBeingTriggered(checkpointId);
-               eventValve.shutValve();
+               // this will automatically be fixed once the checkpoint 
coordinator runs in the
+               // scheduler's main thread executor
+               eventValve.openValveAndUnmarkCheckpoint();
        }
 
-       private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
-               checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || 
currentlyTriggeredCheckpoint == checkpointId);
-       }
+       @Override
+       public void abortCurrentTriggering() {
+               // this method is commonly called by the 
CheckpointCoordinator's executor thread (timer thread).
 
-       private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
-               verifyNoOtherCheckpointBeingTriggered(checkpointId);
-               currentlyTriggeredCheckpoint = checkpointId;
-       }
+               // we ideally want the scheduler main-thread to be the one that 
sends the blocked events
+               // however, we need to react synchronously here, to maintain 
consistency and not allow
+               // another checkpoint injection in-between (unlikely, but 
possible).
+               // fortunately, the event-sending goes pretty much directly to 
the RPC gateways, which are
+               // thread safe.
 
-       private void resetCheckpointTriggeringCheck() {
-               currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+               // this will automatically be fixed once the checkpoint 
coordinator runs in the
+               // scheduler's main thread executor
+               eventValve.openValveAndUnmarkCheckpoint();
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index bc1bfcf..f63df4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
@@ -41,7 +42,9 @@ import java.util.function.BiFunction;
  *
  * <p>This class is fully thread safe, under the assumption that the event 
sender is thread-safe.
  */
-public final class OperatorEventValve {
+final class OperatorEventValve {
+
+       private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
        private final Object lock = new Object();
 
@@ -52,6 +55,12 @@ public final class OperatorEventValve {
        private final Map<Integer, List<BlockedEvent>> blockedEvents = new 
LinkedHashMap<>();
 
        @GuardedBy("lock")
+       private long currentCheckpointId;
+
+       @GuardedBy("lock")
+       private long lastCheckpointId;
+
+       @GuardedBy("lock")
        private boolean shut;
 
        /**
@@ -60,6 +69,8 @@ public final class OperatorEventValve {
         */
        public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender) {
                this.eventSender = eventSender;
+               this.currentCheckpointId = NO_CHECKPOINT;
+               this.lastCheckpointId = Long.MIN_VALUE;
        }
 
        // 
------------------------------------------------------------------------
@@ -96,21 +107,52 @@ public final class OperatorEventValve {
         * Shuts the value. All events sent through this valve are blocked 
until the valve is re-opened.
         * If the valve is already shut, this does nothing.
         */
-       public void shutValve() {
-               // synchronized block for visibility
+       public void markForCheckpoint(long checkpointId) {
                synchronized (lock) {
-                       shut = true;
+                       if (currentCheckpointId != NO_CHECKPOINT && 
currentCheckpointId != checkpointId) {
+                               throw new IllegalStateException(String.format(
+                                               "Cannot mark for checkpoint %d, 
already marked for checkpoint %d",
+                                               checkpointId, 
currentCheckpointId));
+                       }
+                       if (checkpointId > lastCheckpointId) {
+                               currentCheckpointId = checkpointId;
+                               lastCheckpointId = checkpointId;
+                       } else {
+                               throw new IllegalStateException(String.format(
+                                               "Regressing checkpoint IDs. 
Previous checkpointId = %d, new checkpointId = %d",
+                                               lastCheckpointId, 
checkpointId));
+                       }
+               }
+       }
+
+       /**
+        * Shuts the value. All events sent through this valve are blocked 
until the valve is re-opened.
+        * If the valve is already shut, this does nothing.
+        */
+       public void shutValve(long checkpointId) {
+               synchronized (lock) {
+                       if (checkpointId == currentCheckpointId) {
+                               shut = true;
+                       } else {
+                               throw new IllegalStateException(String.format(
+                                       "Cannot shut valve for non-prepared 
checkpoint. " +
+                                       "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
+                                       (currentCheckpointId == NO_CHECKPOINT ? 
"(none)" : String.valueOf(currentCheckpointId)),
+                                       checkpointId));
+                       }
                }
        }
 
        /**
         * Opens the value, releasing all buffered events.
         */
-       public void openValve() {
+       public void openValveAndUnmarkCheckpoint() {
                final ArrayList<FuturePair> futures;
 
                // send all events under lock, so that no new event can sneak 
between
                synchronized (lock) {
+                       currentCheckpointId = NO_CHECKPOINT;
+
                        if (!shut) {
                                return;
                        }
@@ -129,14 +171,7 @@ public final class OperatorEventValve {
 
                // apply the logic on the future outside the lock, to be safe
                for (FuturePair pair : futures) {
-                       final CompletableFuture<Acknowledge> originalFuture = 
pair.originalFuture;
-                       pair.ackFuture.whenComplete((success, failure) -> {
-                               if (failure != null) {
-                                       
originalFuture.completeExceptionally(failure);
-                               } else {
-                                       originalFuture.complete(success);
-                               }
-                       });
+                       FutureUtils.forward(pair.ackFuture, 
pair.originalFuture);
                }
        }
 
@@ -165,6 +200,7 @@ public final class OperatorEventValve {
                        }
                        blockedEvents.clear();
                        shut = false;
+                       currentCheckpointId = NO_CHECKPOINT;
                }
 
                failAllFutures(events);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ce1d8a1..1a10d6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -970,7 +970,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                }
        }
 
-       private void initializeOperatorCoordinators(Executor 
mainThreadExecutor) {
+       private void initializeOperatorCoordinators(ComponentMainThreadExecutor 
mainThreadExecutor) {
                for (OperatorCoordinatorHolder coordinatorHolder : 
getAllCoordinators()) {
                        coordinatorHolder.lazyInitialize(this, 
mainThreadExecutor);
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
new file mode 100644
index 0000000..fa4e7cf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that ensures the before/after conditions around event sending and 
checkpoint are met.
+ * concurrency
+ */
+@SuppressWarnings("serial")
+public class OperatorCoordinatorHolderTest extends TestLogger {
+
+       private final Consumer<Throwable> globalFailureHandler = (t) -> 
globalFailure = t;
+       private Throwable globalFailure;
+
+       @After
+       public void checkNoGlobalFailure() throws Exception {
+               if (globalFailure != null) {
+                       ExceptionUtils.rethrowException(globalFailure);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void checkpointFutureInitiallyNotDone() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               final CompletableFuture<byte[]> checkpointFuture = 
holder.checkpointCoordinator(1L);
+               assertFalse(checkpointFuture.isDone());
+       }
+
+       @Test
+       public void completedCheckpointFuture() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               final byte[] testData = new byte[] {11, 22, 33, 44};
+
+               final CompletableFuture<byte[]> checkpointFuture = 
holder.checkpointCoordinator(9L);
+               
getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
+
+               assertTrue(checkpointFuture.isDone());
+               assertArrayEquals(testData, checkpointFuture.get());
+       }
+
+       @Test
+       public void eventsBeforeCheckpointFutureCompletionPassThrough() throws 
Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               holder.checkpointCoordinator(1L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 1);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(1), 1)
+               ));
+       }
+
+       @Test
+       public void eventsAreBlockedAfterCheckpointFutureCompletes() throws 
Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 10L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1337), 0);
+
+               assertTrue(sender.events.isEmpty());
+       }
+
+       @Test
+       public void abortedCheckpointReleasesBlockedEvents() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 123L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1337), 0);
+               holder.abortCurrentTriggering();
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(1337), 0)
+               ));
+       }
+
+       @Test
+       public void sourceBarrierInjectionReleasesBlockedEvents() throws 
Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 1111L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1337), 0);
+               holder.afterSourceBarrierInjection(1111L);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(1337), 0)
+               ));
+       }
+
+       @Test
+       public void failedTasksDropsBlockedEvents() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 1000L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(0), 0);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 1);
+               holder.subtaskFailed(1, null);
+               holder.abortCurrentTriggering();
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(0), 0)
+               ));
+       }
+
+       @Test
+       public void restoreOpensValveEvents() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 1000L);
+               holder.resetToCheckpoint(new byte[0]);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(999), 1);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(999), 1)
+               ));
+       }
+
+       @Test
+       public void restoreDropsBlockedEvents() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               triggerAndCompleteCheckpoint(holder, 1000L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(0), 0);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 1);
+               holder.resetToCheckpoint(new byte[0]);
+
+               assertTrue(sender.events.isEmpty());
+       }
+
+       @Test
+       public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws 
Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               final CompletableFuture<byte[]> holderFuture = 
holder.checkpointCoordinator(1000L);
+               final CompletableFuture<byte[]> future1 = 
getCoordinator(holder).getLastTriggeredCheckpoint();
+               holder.abortCurrentTriggering();
+
+               triggerAndCompleteCheckpoint(holder, 1010L);
+               holder.afterSourceBarrierInjection(1010L);
+
+               future1.complete(new byte[0]);
+
+               assertTrue(holderFuture.isCompletedExceptionally());
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(123), 0);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(123), 0)
+               ));
+       }
+
+       @Test
+       public void triggeringFailsIfOtherTriggeringInProgress() throws 
Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               holder.checkpointCoordinator(11L);
+               final CompletableFuture<?> future = 
holder.checkpointCoordinator(12L);
+
+               assertTrue(future.isCompletedExceptionally());
+               assertNotNull(globalFailure);
+               globalFailure = null;
+       }
+
+       @Test
+       public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(0), 0);
+
+               triggerAndCompleteCheckpoint(holder, 22L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 0);
+               holder.afterSourceBarrierInjection(22L);
+
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(2), 0);
+
+               triggerAndCompleteCheckpoint(holder, 23L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(3), 0);
+               holder.afterSourceBarrierInjection(23L);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(0), 0),
+                       new EventWithSubtask(new TestOperatorEvent(1), 0),
+                       new EventWithSubtask(new TestOperatorEvent(2), 0),
+                       new EventWithSubtask(new TestOperatorEvent(3), 0)
+               ));
+       }
+
+       @Test
+       public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(0), 0);
+
+               triggerAndCompleteCheckpoint(holder, 22L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 0);
+               holder.abortCurrentTriggering();
+
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(2), 0);
+
+               triggerAndCompleteCheckpoint(holder, 23L);
+               getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(3), 0);
+               holder.afterSourceBarrierInjection(23L);
+
+               assertThat(sender.events, contains(
+                       new EventWithSubtask(new TestOperatorEvent(0), 0),
+                       new EventWithSubtask(new TestOperatorEvent(1), 0),
+                       new EventWithSubtask(new TestOperatorEvent(2), 0),
+                       new EventWithSubtask(new TestOperatorEvent(3), 0)
+               ));
+       }
+
+       /**
+        * This test verifies that the order of Checkpoint Completion and Event 
Sending observed from the
+        * outside matches that from within the OperatorCoordinator.
+        *
+        * <p>Extreme case 1: The coordinator immediately completes the 
checkpoint future and sends an
+        * event directly after that.
+        */
+       @Test
+       public void 
verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws 
Exception {
+               
checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
+       }
+
+       /**
+        * This test verifies that the order of Checkpoint Completion and Event 
Sending observed from the
+        * outside matches that from within the OperatorCoordinator.
+        *
+        * <p>Extreme case 2: After the checkpoint triggering, the coordinator 
flushes a bunch of events
+        * before completing the checkpoint future.
+        */
+       @Test
+       public void 
verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
+               
checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
+       }
+
+       private void checkpointEventValueAtomicity(
+                       final Function<OperatorCoordinator.Context, 
OperatorCoordinator> coordinatorCtor) throws Exception {
+
+               final ManuallyTriggeredScheduledExecutorService executor = new 
ManuallyTriggeredScheduledExecutorService();
+               final ComponentMainThreadExecutor mainThreadExecutor = new 
ComponentMainThreadExecutorServiceAdapter(
+                               (ScheduledExecutorService) executor, 
Thread.currentThread());
+
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(
+                               sender, coordinatorCtor, mainThreadExecutor);
+
+               // give the coordinator some time to emit some events
+               Thread.sleep(new Random().nextInt(10) + 20);
+               executor.triggerAll();
+
+               // trigger the checkpoint - this should also shut the valve as 
soon as the future is completed
+               final CompletableFuture<byte[]> checkpointFuture = 
holder.checkpointCoordinator(0L);
+               executor.triggerAll();
+
+               // give the coordinator some time to emit some events
+               Thread.sleep(new Random().nextInt(10) + 10);
+               holder.close();
+               executor.triggerAll();
+
+               assertTrue(checkpointFuture.isDone());
+               final int checkpointedNumber = 
bytesToInt(checkpointFuture.get());
+
+               assertEquals(checkpointedNumber, sender.events.size());
+               for (int i = 0; i < checkpointedNumber; i++) {
+                       assertEquals(i, ((TestOperatorEvent) 
sender.events.get(i).event).getValue());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //   test actions
+       // 
------------------------------------------------------------------------
+
+       private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(
+                       OperatorCoordinatorHolder holder,
+                       long checkpointId) throws Exception {
+
+               final CompletableFuture<byte[]> future = 
holder.checkpointCoordinator(checkpointId);
+               
getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
+               return future;
+       }
+
+       // 
------------------------------------------------------------------------
+       //   miscellaneous helpers
+       // 
------------------------------------------------------------------------
+
+       static byte[] intToBytes(int value) {
+               return ByteBuffer.allocate(4).putInt(value).array();
+       }
+
+       static int bytesToInt(byte[] bytes) {
+               return ByteBuffer.wrap(bytes).getInt();
+       }
+
+       private static TestingOperatorCoordinator 
getCoordinator(OperatorCoordinatorHolder holder) {
+               return (TestingOperatorCoordinator) holder.coordinator();
+       }
+
+       private OperatorCoordinatorHolder createCoordinatorHolder(
+                       final BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender,
+                       final Function<OperatorCoordinator.Context, 
OperatorCoordinator> coordinatorCtor) throws Exception {
+
+               return createCoordinatorHolder(
+                               eventSender,
+                               coordinatorCtor,
+                               
ComponentMainThreadExecutorServiceAdapter.forMainThread());
+       }
+
+       private OperatorCoordinatorHolder createCoordinatorHolder(
+                       final BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender,
+                       final Function<OperatorCoordinator.Context, 
OperatorCoordinator> coordinatorCtor,
+                       final ComponentMainThreadExecutor mainThreadExecutor) 
throws Exception {
+
+               final OperatorID opId = new OperatorID();
+               final OperatorCoordinator.Provider provider = new 
OperatorCoordinator.Provider() {
+                       @Override
+                       public OperatorID getOperatorId() {
+                               return opId;
+                       }
+
+                       @Override
+                       public OperatorCoordinator 
create(OperatorCoordinator.Context context) {
+                               return coordinatorCtor.apply(context);
+                       }
+               };
+
+               final OperatorCoordinatorHolder holder = 
OperatorCoordinatorHolder.create(
+                               opId,
+                               provider,
+                               eventSender,
+                               "test-coordinator-name",
+                               3,
+                               1775);
+
+               holder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
+               holder.start();
+
+               return holder;
+       }
+
+       // 
------------------------------------------------------------------------
+       //   test implementations
+       // 
------------------------------------------------------------------------
+
+       private static final class FutureCompletedInstantlyTestCoordinator 
extends CheckpointEventOrderTestBaseCoordinator {
+
+               private final ReentrantLock lock = new ReentrantLock(true);
+               private final Condition condition = lock.newCondition();
+
+               @Nullable
+               @GuardedBy("lock")
+               private CompletableFuture<byte[]> checkpoint;
+
+               private int num;
+
+               FutureCompletedInstantlyTestCoordinator(Context context) {
+                       super(context);
+               }
+
+               @Override
+               public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
+                       // we create the checkpoint future, but before 
returning it, we wait on a
+                       // condition. that way, we simulate a "context switch" 
just at the time when the
+                       // future would be returned and make the other thread 
complete the future and send an
+                       // event before this method returns
+                       final CompletableFuture<byte[]> checkpointFuture = new 
CompletableFuture<>();
+                       lock.lock();
+                       try {
+                               checkpoint = checkpointFuture;
+                               condition.await();
+                               return checkpointFuture;
+                       } finally {
+                               lock.unlock();
+                       }
+               }
+
+               @Override
+               protected void step() throws Exception {
+                       lock.lock();
+                       try {
+                               // if there is a checkpoint to complete, we 
complete it and immediately
+                               // try to send another event, without releasing 
the lock. that way we
+                               // force the situation as if the checkpoint get 
completed and an event gets
+                               // sent while the triggering thread is stalled
+                               if (checkpoint != null) {
+                                       checkpoint.complete(intToBytes(num));
+                                       checkpoint = null;
+                               }
+                               context.sendEvent(new TestOperatorEvent(num++), 
0);
+                               condition.signalAll();
+                       } finally {
+                               lock.unlock();
+                       }
+
+                       Thread.sleep(2);
+               }
+       }
+
+       private static final class FutureCompletedAfterSendingEventsCoordinator 
extends CheckpointEventOrderTestBaseCoordinator {
+
+               @Nullable
+               private CompletableFuture<byte[]> checkpoint;
+
+               private int num;
+
+               FutureCompletedAfterSendingEventsCoordinator(Context context) {
+                       super(context);
+               }
+
+               @Override
+               public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
+                       final CompletableFuture<byte[]> checkpointFuture = new 
CompletableFuture<>();
+                       checkpoint = checkpointFuture;
+                       return checkpointFuture;
+               }
+
+               @Override
+               protected void step() throws Exception {
+                       Thread.sleep(2);
+
+                       context.sendEvent(new TestOperatorEvent(num++), 0);
+                       context.sendEvent(new TestOperatorEvent(num++), 1);
+                       context.sendEvent(new TestOperatorEvent(num++), 2);
+
+                       if (checkpoint != null) {
+                               checkpoint.complete(intToBytes(num));
+                               checkpoint = null;
+                       }
+               }
+       }
+
+       private abstract static class CheckpointEventOrderTestBaseCoordinator 
implements OperatorCoordinator, Runnable {
+
+               private final Thread coordinatorThread;
+
+               protected final Context context;
+
+               private volatile boolean closed;
+
+               CheckpointEventOrderTestBaseCoordinator(Context context) {
+                       this.context = context;
+                       this.coordinatorThread = new Thread(this);
+               }
+
+               @Override
+               public void start() throws Exception {
+                       coordinatorThread.start();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       closed = true;
+                       coordinatorThread.interrupt();
+                       coordinatorThread.join();
+               }
+
+               @Override
+               public void handleEventFromOperator(int subtask, OperatorEvent 
event){}
+
+               @Override
+               public void subtaskFailed(int subtask, @Nullable Throwable 
reason) {}
+
+               @Override
+               public abstract CompletableFuture<byte[]> 
checkpointCoordinator(long checkpointId) throws Exception;
+
+               @Override
+               public void checkpointComplete(long checkpointId) {}
+
+               @Override
+               public void resetToCheckpoint(byte[] checkpointData) throws 
Exception {}
+
+               @Override
+               public void run() {
+                       try {
+                               while (!closed) {
+                                       step();
+                               }
+                       } catch (Throwable t) {
+                               if (closed) {
+                                       return;
+                               }
+
+                               // this should never happen, but just in case, 
print and crash the test
+                               //noinspection CallToPrintStackTrace
+                               t.printStackTrace();
+                               System.exit(-1);
+                       }
+               }
+
+               protected abstract void step() throws Exception;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index 9b0325d..a380958 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -50,11 +50,30 @@ public class OperatorEventValveTest {
                assertTrue(future.isDone());
        }
 
+       @Test(expected = IllegalStateException.class)
+       public void errorShuttingUnmarkedValve() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorEventValve valve = new OperatorEventValve(sender);
+
+               valve.shutValve(123L);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void errorShuttingValveForOtherMark() throws Exception {
+               final TestEventSender sender = new TestEventSender();
+               final OperatorEventValve valve = new OperatorEventValve(sender);
+
+               valve.markForCheckpoint(100L);
+               valve.shutValve(123L);
+       }
+
        @Test
        public void eventsBlockedByClosedValve() throws Exception {
                final TestEventSender sender = new TestEventSender();
                final OperatorEventValve valve = new OperatorEventValve(sender);
-               valve.shutValve();
+
+               valve.markForCheckpoint(1L);
+               valve.shutValve(1L);
 
                final CompletableFuture<Acknowledge> future =
                                valve.sendEvent(new SerializedValue<>(new 
TestOperatorEvent()), 1);
@@ -67,14 +86,16 @@ public class OperatorEventValveTest {
        public void eventsReleasedAfterOpeningValve() throws Exception {
                final TestEventSender sender = new TestEventSender();
                final OperatorEventValve valve = new OperatorEventValve(sender);
-               valve.shutValve();
+
+               valve.markForCheckpoint(17L);
+               valve.shutValve(17L);
 
                final OperatorEvent event1 = new TestOperatorEvent();
                final OperatorEvent event2 = new TestOperatorEvent();
                final CompletableFuture<Acknowledge> future1 = 
valve.sendEvent(new SerializedValue<>(event1), 3);
                final CompletableFuture<Acknowledge> future2 = 
valve.sendEvent(new SerializedValue<>(event2), 0);
 
-               valve.openValve();
+               valve.openValveAndUnmarkCheckpoint();
 
                assertThat(sender.events, containsInAnyOrder(
                        new EventWithSubtask(event1, 3),
@@ -88,11 +109,13 @@ public class OperatorEventValveTest {
        public void releasedEventsForwardSendFailures() throws Exception {
                final TestEventSender sender = new TestEventSender(new 
FlinkException("test"));
                final OperatorEventValve valve = new OperatorEventValve(sender);
-               valve.shutValve();
+
+               valve.markForCheckpoint(17L);
+               valve.shutValve(17L);
 
                final CompletableFuture<Acknowledge> future =
                                valve.sendEvent(new SerializedValue<>(new 
TestOperatorEvent()), 10);
-               valve.openValve();
+               valve.openValveAndUnmarkCheckpoint();
 
                assertTrue(future.isCompletedExceptionally());
        }
@@ -101,13 +124,14 @@ public class OperatorEventValveTest {
        public void resetDropsAllEvents() throws Exception {
                final TestEventSender sender = new TestEventSender();
                final OperatorEventValve valve = new OperatorEventValve(sender);
-               valve.shutValve();
+               valve.markForCheckpoint(17L);
+               valve.shutValve(17L);
 
                valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 
0);
                valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 
1);
 
                valve.reset();
-               valve.openValve();
+               valve.openValveAndUnmarkCheckpoint();
 
                assertTrue(sender.events.isEmpty());
        }
@@ -116,7 +140,8 @@ public class OperatorEventValveTest {
        public void resetForTaskDropsSelectiveEvents() throws Exception {
                final TestEventSender sender = new TestEventSender();
                final OperatorEventValve valve = new OperatorEventValve(sender);
-               valve.shutValve();
+               valve.markForCheckpoint(17L);
+               valve.shutValve(17L);
 
                final OperatorEvent event1 = new TestOperatorEvent();
                final OperatorEvent event2 = new TestOperatorEvent();
@@ -124,7 +149,7 @@ public class OperatorEventValveTest {
                final CompletableFuture<Acknowledge> future2 = 
valve.sendEvent(new SerializedValue<>(event2), 1);
 
                valve.resetForTask(1);
-               valve.openValve();
+               valve.openValveAndUnmarkCheckpoint();
 
                assertThat(sender.events, contains(new EventWithSubtask(event1, 
0)));
                assertTrue(future1.isDone());
@@ -136,7 +161,8 @@ public class OperatorEventValveTest {
                final TestEventSender sender = new TestEventSender();
                final OperatorEventValve valve = new OperatorEventValve(sender);
 
-               valve.shutValve();
+               valve.markForCheckpoint(17L);
+               valve.shutValve(17L);
                valve.reset();
 
                assertFalse(valve.isShut());

Reply via email to