This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4595ec755308ba0608910e67561ba9f7e6e00c43
Author: Stephan Ewen <[email protected]>
AuthorDate: Sat Apr 10 21:34:44 2021 +0200

    [FLINK-18071][coordination] (part 1) All event sending and checkpoint 
actions for OperatorCoordinator happen in Scheduler Thread
    
    Having event sending in the scheduler thream means that at the time when 
the event is dispatched to the subtask, we have
    a concistent view on the execution state of a task execution and are not 
subject to any race conditions.
    
    But when event sending is in the scheduler thread, the checkpoint 
completion must be in the scheduler thread as well,
    to that we keep strict ordering of events and checkpoints.
---
 .../coordination/OperatorCoordinatorHolder.java    |  76 ++++----
 .../operators/coordination/OperatorEventValve.java | 211 +++++++++++----------
 .../OperatorCoordinatorSchedulerTest.java          |  15 ++
 .../coordination/OperatorEventValveTest.java       |  34 ++--
 4 files changed, 174 insertions(+), 162 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index d09a4a9..21974c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -69,9 +68,9 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <ul>
  *   <li>Events pass through a special channel, the {@link 
OperatorEventValve}. If we are not
  *       currently triggering a checkpoint, then events simply pass through.
- *   <li>Atomically, with the completion of the checkpoint future for the 
coordinator, this operator
- *       operator event valve is closed. Events coming after that are held 
back (buffered), because
- *       they belong to the epoch after the checkpoint.
+ *   <li>With the completion of the checkpoint future for the coordinator, 
this operator event valve
+ *       is closed. Events coming after that are held back (buffered), because 
they belong to the
+ *       epoch after the checkpoint.
  *   <li>Once all coordinators in the job have completed the checkpoint, the 
barriers to the sources
  *       are injected. After that (see {@link 
#afterSourceBarrierInjection(long)}) the valves are
  *       opened again and the events are sent.
@@ -106,10 +105,13 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  *
  * <h3>Concurrency and Threading Model</h3>
  *
- * <p>This component runs mainly in a main-thread-executor, like RPC 
endpoints. However, some
- * actions need to be triggered synchronously by other threads. Most notably, 
when the checkpoint
- * future is completed by the {@code OperatorCoordinator} implementation, we 
need to synchronously
- * suspend event-sending.
+ * <p>This component runs strictly in the Scheduler's main-thread-executor. 
All calls "from the
+ * outside" are either already in the main-thread-executor (when coming from 
Scheduler) or put into
+ * the main-thread-executor (when coming from the CheckpointCoordinator). We 
rely on the executor to
+ * preserve strict order of the calls.
+ *
+ * <p>Actions from the coordinator to the "outside world" (like completing a 
checkpoint and sending
+ * an event) are also enqueued back into the scheduler main-thread executor, 
strictly in order.
  */
 public class OperatorCoordinatorHolder
         implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
@@ -146,6 +148,7 @@ public class OperatorCoordinatorHolder
             ComponentMainThreadExecutor mainThreadExecutor) {
         this.globalFailureHandler = globalFailureHandler;
         this.mainThreadExecutor = mainThreadExecutor;
+        eventValve.setMainThreadExecutorForValidation(mainThreadExecutor);
         context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
     }
 
@@ -238,8 +241,11 @@ public class OperatorCoordinatorHolder
     @Override
     public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
             throws Exception {
-        // ideally we would like to check this here, however this method is 
called early during
-        // execution graph construction, before the main thread executor is set
+        // the first time this method is called is early during execution 
graph construction,
+        // before the main thread executor is set. hence this conditional 
check.
+        if (mainThreadExecutor != null) {
+            mainThreadExecutor.assertRunningInMainThread();
+        }
 
         eventValve.reset();
         if (context != null) {
@@ -252,8 +258,9 @@ public class OperatorCoordinatorHolder
             final long checkpointId, final CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
-        // synchronously!!!, with the completion, we need to shut the event 
valve
-        result.whenComplete(
+        final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
+
+        coordinatorCheckpoint.whenCompleteAsync(
                 (success, failure) -> {
                     if (failure != null) {
                         result.completeExceptionally(failure);
@@ -265,11 +272,12 @@ public class OperatorCoordinatorHolder
                             result.completeExceptionally(e);
                         }
                     }
-                });
+                },
+                mainThreadExecutor);
 
         try {
             eventValve.markForCheckpoint(checkpointId);
-            coordinator.checkpointCoordinator(checkpointId, result);
+            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
         } catch (Throwable t) {
             ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
             result.completeExceptionally(t);
@@ -283,34 +291,20 @@ public class OperatorCoordinatorHolder
 
     @Override
     public void afterSourceBarrierInjection(long checkpointId) {
-        // this method is commonly called by the CheckpointCoordinator's 
executor thread (timer
-        // thread).
-
-        // we ideally want the scheduler main-thread to be the one that sends 
the blocked events
-        // however, we need to react synchronously here, to maintain 
consistency and not allow
-        // another checkpoint injection in-between (unlikely, but possible).
-        // fortunately, the event-sending goes pretty much directly to the RPC 
gateways, which are
-        // thread safe.
-
-        // this will automatically be fixed once the checkpoint coordinator 
runs in the
+        // unfortunately, this method does not run in the scheduler executor, 
but in the
+        // checkpoint coordinator time thread.
+        // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        eventValve.openValveAndUnmarkCheckpoint();
+        mainThreadExecutor.execute(() -> 
eventValve.openValveAndUnmarkCheckpoint(checkpointId));
     }
 
     @Override
     public void abortCurrentTriggering() {
-        // this method is commonly called by the CheckpointCoordinator's 
executor thread (timer
-        // thread).
-
-        // we ideally want the scheduler main-thread to be the one that sends 
the blocked events
-        // however, we need to react synchronously here, to maintain 
consistency and not allow
-        // another checkpoint injection in-between (unlikely, but possible).
-        // fortunately, the event-sending goes pretty much directly to the RPC 
gateways, which are
-        // thread safe.
-
-        // this will automatically be fixed once the checkpoint coordinator 
runs in the
+        // unfortunately, this method does not run in the scheduler executor, 
but in the
+        // checkpoint coordinator time thread.
+        // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        eventValve.openValveAndUnmarkCheckpoint();
+        mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
     }
 
     // ------------------------------------------------------------------------
@@ -465,12 +459,10 @@ public class OperatorCoordinatorHolder
                 throw new FlinkRuntimeException("Cannot serialize operator 
event", e);
             }
 
-            try {
-                return eventValve.sendEvent(serializedEvent, targetSubtask);
-            } catch (Throwable t) {
-                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-                return FutureUtils.completedExceptionally(t);
-            }
+            final CompletableFuture<Acknowledge> result = new 
CompletableFuture<>();
+            schedulerExecutor.execute(
+                    () -> eventValve.sendEvent(serializedEvent, targetSubtask, 
result));
+            return result;
         }
 
         @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index 5a97df1..99eb7aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -40,31 +40,28 @@ import java.util.function.BiFunction;
  *
  * <p>The valve can also drop buffered events for all or selected targets.
  *
- * <p>This class is fully thread safe, under the assumption that the event 
sender is thread-safe.
+ * <p>This class is NOT thread safe, but assumed to be used in a single 
threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
+ * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
  */
 final class OperatorEventValve {
 
     private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
-    private final Object lock = new Object();
-
-    @GuardedBy("lock")
     private final BiFunction<
                     SerializedValue<OperatorEvent>, Integer, 
CompletableFuture<Acknowledge>>
             eventSender;
 
-    @GuardedBy("lock")
     private final Map<Integer, List<BlockedEvent>> blockedEvents = new 
LinkedHashMap<>();
 
-    @GuardedBy("lock")
     private long currentCheckpointId;
 
-    @GuardedBy("lock")
     private long lastCheckpointId;
 
-    @GuardedBy("lock")
     private boolean shut;
 
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
     /**
      * Constructs a new OperatorEventValve, passing the events to the given 
function when the valve
      * is open or opened again. The second parameter of the BiFunction is the 
target operator
@@ -78,13 +75,16 @@ final class OperatorEventValve {
         this.lastCheckpointId = Long.MIN_VALUE;
     }
 
+    public void setMainThreadExecutorForValidation(ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
     // ------------------------------------------------------------------------
 
     public boolean isShut() {
-        // synchronized block for visibility
-        synchronized (lock) {
-            return shut;
-        }
+        checkRunsInMainThread();
+
+        return shut;
     }
 
     /**
@@ -95,43 +95,54 @@ final class OperatorEventValve {
      * is completed with the original result once the valve is opened. If the 
event is never sent
      * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
      * then the returned future till be completed exceptionally.
+     *
+     * <p>This method makes no assumptions and gives no guarantees from which 
thread the result
+     * future gets completed.
      */
-    public CompletableFuture<Acknowledge> sendEvent(
-            SerializedValue<OperatorEvent> event, int subtask) {
-        synchronized (lock) {
-            if (!shut) {
-                return eventSender.apply(event, subtask);
-            }
-
-            final List<BlockedEvent> eventsForTask =
-                    blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-            final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-            eventsForTask.add(new BlockedEvent(event, subtask, future));
-            return future;
+    public void sendEvent(
+            SerializedValue<OperatorEvent> event,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
+        if (!shut) {
+            final CompletableFuture<Acknowledge> ack = 
eventSender.apply(event, subtask);
+            FutureUtils.forward(ack, result);
+            return;
         }
+
+        final List<BlockedEvent> eventsForTask =
+                blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
+        eventsForTask.add(new BlockedEvent(event, subtask, result));
     }
 
     /**
-     * Shuts the value. All events sent through this valve are blocked until 
the valve is re-opened.
-     * If the valve is already shut, this does nothing.
+     * Marks the valve for the next checkpoint. This remembers the checkpoint 
ID and will only allow
+     * shutting the value for this specific checkpoint.
+     *
+     * <p>This is the valve's mechanism to detect situations where multiple 
coordinator checkpoints
+     * would be attempted overlapping, which is currently not supported (the 
valve doesn't keep a
+     * list of events blocked per checkpoint). It also helps to identify 
situations where the
+     * checkpoint was aborted even before the valve was shut (by finding out 
that the {@code
+     * currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
      */
     public void markForCheckpoint(long checkpointId) {
-        synchronized (lock) {
-            if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
-                throw new IllegalStateException(
-                        String.format(
-                                "Cannot mark for checkpoint %d, already marked 
for checkpoint %d",
-                                checkpointId, currentCheckpointId));
-            }
-            if (checkpointId > lastCheckpointId) {
-                currentCheckpointId = checkpointId;
-                lastCheckpointId = checkpointId;
-            } else {
-                throw new IllegalStateException(
-                        String.format(
-                                "Regressing checkpoint IDs. Previous 
checkpointId = %d, new checkpointId = %d",
-                                lastCheckpointId, checkpointId));
-            }
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
         }
     }
 
@@ -140,76 +151,75 @@ final class OperatorEventValve {
      * If the valve is already shut, this does nothing.
      */
     public void shutValve(long checkpointId) {
-        synchronized (lock) {
-            if (checkpointId == currentCheckpointId) {
-                shut = true;
-            } else {
-                throw new IllegalStateException(
-                        String.format(
-                                "Cannot shut valve for non-prepared 
checkpoint. "
-                                        + "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
-                                (currentCheckpointId == NO_CHECKPOINT
-                                        ? "(none)"
-                                        : String.valueOf(currentCheckpointId)),
-                                checkpointId));
-            }
+        checkRunsInMainThread();
+
+        if (checkpointId == currentCheckpointId) {
+            shut = true;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot shut valve for non-prepared checkpoint. "
+                                    + "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
+                            (currentCheckpointId == NO_CHECKPOINT
+                                    ? "(none)"
+                                    : String.valueOf(currentCheckpointId)),
+                            checkpointId));
         }
     }
 
-    /** Opens the value, releasing all buffered events. */
-    public void openValveAndUnmarkCheckpoint() {
-        final ArrayList<FuturePair> futures;
-
-        // send all events under lock, so that no new event can sneak between
-        synchronized (lock) {
-            currentCheckpointId = NO_CHECKPOINT;
+    public void openValveAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
 
-            if (!shut) {
-                return;
-            }
+        if (expectedCheckpointId != currentCheckpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Valve closed for different checkpoint: closed for 
= %d, expected = %d",
+                            currentCheckpointId, expectedCheckpointId));
+        }
+        openValveAndUnmarkCheckpoint();
+    }
 
-            futures = new ArrayList<>(blockedEvents.size());
+    /** Opens the value, releasing all buffered events. */
+    public void openValveAndUnmarkCheckpoint() {
+        checkRunsInMainThread();
 
-            for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
-                for (BlockedEvent blockedEvent : eventsForTask) {
-                    final CompletableFuture<Acknowledge> ackFuture =
-                            eventSender.apply(blockedEvent.event, 
blockedEvent.subtask);
-                    futures.add(new FuturePair(blockedEvent.future, 
ackFuture));
-                }
-            }
-            blockedEvents.clear();
-            shut = false;
+        currentCheckpointId = NO_CHECKPOINT;
+        if (!shut) {
+            return;
         }
 
-        // apply the logic on the future outside the lock, to be safe
-        for (FuturePair pair : futures) {
-            FutureUtils.forward(pair.ackFuture, pair.originalFuture);
+        for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+            for (BlockedEvent blockedEvent : eventsForTask) {
+                final CompletableFuture<Acknowledge> ackFuture =
+                        eventSender.apply(blockedEvent.event, 
blockedEvent.subtask);
+                FutureUtils.forward(ackFuture, blockedEvent.future);
+            }
         }
+        blockedEvents.clear();
+        shut = false;
     }
 
     /** Drops all blocked events for a specific subtask. */
     public void resetForTask(int subtask) {
-        final List<BlockedEvent> events;
-        synchronized (lock) {
-            events = blockedEvents.remove(subtask);
-        }
+        checkRunsInMainThread();
 
+        final List<BlockedEvent> events = blockedEvents.remove(subtask);
         failAllFutures(events);
     }
 
     /** Resets the valve, dropping all blocked events and opening the valve. */
     public void reset() {
+        checkRunsInMainThread();
+
         final List<BlockedEvent> events = new ArrayList<>();
-        synchronized (lock) {
-            for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
-                if (taskEvents != null) {
-                    events.addAll(taskEvents);
-                }
+        for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+            if (taskEvents != null) {
+                events.addAll(taskEvents);
             }
-            blockedEvents.clear();
-            shut = false;
-            currentCheckpointId = NO_CHECKPOINT;
         }
+        blockedEvents.clear();
+        shut = false;
+        currentCheckpointId = NO_CHECKPOINT;
 
         failAllFutures(events);
     }
@@ -226,6 +236,12 @@ final class OperatorEventValve {
         }
     }
 
+    private void checkRunsInMainThread() {
+        if (mainThreadExecutor != null) {
+            mainThreadExecutor.assertRunningInMainThread();
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     private static final class BlockedEvent {
@@ -243,17 +259,4 @@ final class OperatorEventValve {
             this.subtask = subtask;
         }
     }
-
-    private static final class FuturePair {
-
-        final CompletableFuture<Acknowledge> originalFuture;
-        final CompletableFuture<Acknowledge> ackFuture;
-
-        FuturePair(
-                CompletableFuture<Acknowledge> originalFuture,
-                CompletableFuture<Acknowledge> ackFuture) {
-            this.originalFuture = originalFuture;
-            this.ackFuture = ackFuture;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index b256185..8a1d331 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -320,6 +320,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                 coordinator.getLastTriggeredCheckpoint();
 
         coordinatorStateFuture.completeExceptionally(new TestException());
+        waitForCompletionToPropagate(checkpointFuture);
 
         assertThat(checkpointFuture, futureWillCompleteWithTestException());
     }
@@ -810,6 +811,20 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
         return future;
     }
 
+    private void waitForCompletionToPropagate(CompletableFuture<?> 
checkpointFuture) {
+        // this part is necessary because the user/application-code-driven 
coordinator
+        // forwards the checkpoint to the scheduler thread, which in turn 
needs to finish
+        // work
+        while (!checkpointFuture.isDone()) {
+            executor.triggerAll();
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                throw new Error(e);
+            }
+        }
+    }
+
     private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
         executor.triggerAll();
         SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index f942b0b..53ee685 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -42,8 +42,8 @@ public class OperatorEventValveTest {
         final OperatorEventValve valve = new OperatorEventValve(sender);
 
         final OperatorEvent event = new TestOperatorEvent();
-        final CompletableFuture<Acknowledge> future =
-                valve.sendEvent(new SerializedValue<>(event), 11);
+        final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(event), 11, future);
 
         assertThat(sender.events, contains(new EventWithSubtask(event, 11)));
         assertTrue(future.isDone());
@@ -74,8 +74,8 @@ public class OperatorEventValveTest {
         valve.markForCheckpoint(1L);
         valve.shutValve(1L);
 
-        final CompletableFuture<Acknowledge> future =
-                valve.sendEvent(new SerializedValue<>(new 
TestOperatorEvent()), 1);
+        final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1, 
future);
 
         assertTrue(sender.events.isEmpty());
         assertFalse(future.isDone());
@@ -91,10 +91,10 @@ public class OperatorEventValveTest {
 
         final OperatorEvent event1 = new TestOperatorEvent();
         final OperatorEvent event2 = new TestOperatorEvent();
-        final CompletableFuture<Acknowledge> future1 =
-                valve.sendEvent(new SerializedValue<>(event1), 3);
-        final CompletableFuture<Acknowledge> future2 =
-                valve.sendEvent(new SerializedValue<>(event2), 0);
+        final CompletableFuture<Acknowledge> future1 = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(event1), 3, future1);
+        final CompletableFuture<Acknowledge> future2 = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(event2), 0, future2);
 
         valve.openValveAndUnmarkCheckpoint();
 
@@ -114,8 +114,8 @@ public class OperatorEventValveTest {
         valve.markForCheckpoint(17L);
         valve.shutValve(17L);
 
-        final CompletableFuture<Acknowledge> future =
-                valve.sendEvent(new SerializedValue<>(new 
TestOperatorEvent()), 10);
+        final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10, 
future);
         valve.openValveAndUnmarkCheckpoint();
 
         assertTrue(future.isCompletedExceptionally());
@@ -128,8 +128,10 @@ public class OperatorEventValveTest {
         valve.markForCheckpoint(17L);
         valve.shutValve(17L);
 
-        valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
-        valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+        valve.sendEvent(
+                new SerializedValue<>(new TestOperatorEvent()), 0, new 
CompletableFuture<>());
+        valve.sendEvent(
+                new SerializedValue<>(new TestOperatorEvent()), 1, new 
CompletableFuture<>());
 
         valve.reset();
         valve.openValveAndUnmarkCheckpoint();
@@ -146,10 +148,10 @@ public class OperatorEventValveTest {
 
         final OperatorEvent event1 = new TestOperatorEvent();
         final OperatorEvent event2 = new TestOperatorEvent();
-        final CompletableFuture<Acknowledge> future1 =
-                valve.sendEvent(new SerializedValue<>(event1), 0);
-        final CompletableFuture<Acknowledge> future2 =
-                valve.sendEvent(new SerializedValue<>(event2), 1);
+        final CompletableFuture<Acknowledge> future1 = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(event1), 0, future1);
+        final CompletableFuture<Acknowledge> future2 = new 
CompletableFuture<>();
+        valve.sendEvent(new SerializedValue<>(event2), 1, future2);
 
         valve.resetForTask(1);
         valve.openValveAndUnmarkCheckpoint();

Reply via email to