StephanEwen commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612353974



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -140,76 +140,75 @@ public void markForCheckpoint(long checkpointId) {
      * If the valve is already shut, this does nothing.
      */
     public void shutValve(long checkpointId) {
-        synchronized (lock) {
-            if (checkpointId == currentCheckpointId) {
-                shut = true;
-            } else {
-                throw new IllegalStateException(
-                        String.format(
-                                "Cannot shut valve for non-prepared 
checkpoint. "
-                                        + "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
-                                (currentCheckpointId == NO_CHECKPOINT
-                                        ? "(none)"
-                                        : String.valueOf(currentCheckpointId)),
-                                checkpointId));
-            }
+        checkRunsInMainThread();
+
+        if (checkpointId == currentCheckpointId) {
+            shut = true;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot shut valve for non-prepared checkpoint. "
+                                    + "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
+                            (currentCheckpointId == NO_CHECKPOINT
+                                    ? "(none)"
+                                    : String.valueOf(currentCheckpointId)),
+                            checkpointId));
         }
     }
 
-    /** Opens the value, releasing all buffered events. */
-    public void openValveAndUnmarkCheckpoint() {
-        final ArrayList<FuturePair> futures;
-
-        // send all events under lock, so that no new event can sneak between
-        synchronized (lock) {
-            currentCheckpointId = NO_CHECKPOINT;
+    public void openValveAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
 
-            if (!shut) {
-                return;
-            }
+        if (expectedCheckpointId != currentCheckpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Valve closed for different checkpoint: closed for 
= %d, expected = %d",
+                            currentCheckpointId, expectedCheckpointId));
+        }
+        openValveAndUnmarkCheckpoint();
+    }
 
-            futures = new ArrayList<>(blockedEvents.size());
+    /** Opens the value, releasing all buffered events. */
+    public void openValveAndUnmarkCheckpoint() {
+        checkRunsInMainThread();
 
-            for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
-                for (BlockedEvent blockedEvent : eventsForTask) {
-                    final CompletableFuture<Acknowledge> ackFuture =
-                            eventSender.apply(blockedEvent.event, 
blockedEvent.subtask);
-                    futures.add(new FuturePair(blockedEvent.future, 
ackFuture));
-                }
-            }
-            blockedEvents.clear();
-            shut = false;
+        currentCheckpointId = NO_CHECKPOINT;
+        if (!shut) {
+            return;
         }
 
-        // apply the logic on the future outside the lock, to be safe
-        for (FuturePair pair : futures) {
-            FutureUtils.forward(pair.ackFuture, pair.originalFuture);
+        for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+            for (BlockedEvent blockedEvent : eventsForTask) {
+                final CompletableFuture<Acknowledge> ackFuture =
+                        eventSender.apply(blockedEvent.event, 
blockedEvent.subtask);
+                FutureUtils.forward(ackFuture, blockedEvent.future);
+            }
         }
+        blockedEvents.clear();
+        shut = false;
     }
 
     /** Drops all blocked events for a specific subtask. */
     public void resetForTask(int subtask) {
-        final List<BlockedEvent> events;
-        synchronized (lock) {
-            events = blockedEvents.remove(subtask);
-        }
+        checkRunsInMainThread();
 
+        final List<BlockedEvent> events = blockedEvents.remove(subtask);
         failAllFutures(events);
     }
 
     /** Resets the valve, dropping all blocked events and opening the valve. */
     public void reset() {
+        checkRunsInMainThread();
+
         final List<BlockedEvent> events = new ArrayList<>();
-        synchronized (lock) {
-            for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
-                if (taskEvents != null) {
-                    events.addAll(taskEvents);
-                }
+        for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+            if (taskEvents != null) {

Review comment:
       not really, I think. I left it as before. It is not wrong, just 
unnecessary. This part anyways gets reworked in the next PR, so I'd prefer to 
skip this to avoid extra merge conflicts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to