StephanEwen commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613177901
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
* (because it gets dropped through a call to {@link #reset()} or {@link
#resetForTask(int)},
* then the returned future till be completed exceptionally.
*/
- public CompletableFuture<Acknowledge> sendEvent(
- SerializedValue<OperatorEvent> event, int subtask) {
- synchronized (lock) {
- if (!shut) {
- return eventSender.apply(event, subtask);
- }
-
- final List<BlockedEvent> eventsForTask =
- blockedEvents.computeIfAbsent(subtask, (key) -> new
ArrayList<>());
- final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
- eventsForTask.add(new BlockedEvent(event, subtask, future));
- return future;
+ public void sendEvent(
+ SerializedValue<OperatorEvent> event,
+ int subtask,
+ CompletableFuture<Acknowledge> result) {
+ checkRunsInMainThread();
+
+ if (!shut) {
+ final CompletableFuture<Acknowledge> ack =
eventSender.apply(event, subtask);
Review comment:
I think we don't need the guard in `Execution.sendOperatorEvent` because
at that point it doesn't matter. The RPC gateways are thread safe. We only need
to make sure we don't reach the wrong target, and for that we only need to
reference the right `Execution` object (that is done in the next PR).
Now that I think about it a bit more. We also don't really need the
scheduler main thread executor for sending, we really only need to know that
all event sending and checkpoint completion actions are done in the same thread
so that order is preserved. The fact that this thread is the scheduler
main-thread is an implementation detail that was chosen to make other parts
simpler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]