StephanEwen commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613177901



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
      * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
      * then the returned future till be completed exceptionally.
      */
-    public CompletableFuture<Acknowledge> sendEvent(
-            SerializedValue<OperatorEvent> event, int subtask) {
-        synchronized (lock) {
-            if (!shut) {
-                return eventSender.apply(event, subtask);
-            }
-
-            final List<BlockedEvent> eventsForTask =
-                    blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-            final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-            eventsForTask.add(new BlockedEvent(event, subtask, future));
-            return future;
+    public void sendEvent(
+            SerializedValue<OperatorEvent> event,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
+        if (!shut) {
+            final CompletableFuture<Acknowledge> ack = 
eventSender.apply(event, subtask);

Review comment:
       I think we don't need the guard in `Execution.sendOperatorEvent` because 
at that point it doesn't matter. The RPC gateways are thread safe. We only need 
to make sure we don't reach the wrong target, and for that we only need to 
reference the right `Execution` object (that is done in the next PR).
   
   Now that I think about it a bit more. We also don't really need the 
scheduler main thread executor for sending, we really only need to know that 
all event sending and checkpoint completion actions are done in the same thread 
so that order is preserved. The fact that this thread is the scheduler 
main-thread is an implementation detail that was chosen to make other parts 
simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to