StephanEwen commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r432841344



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -99,29 +187,110 @@ public void close() throws Exception {
 
        @Override
        public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
+               mainThreadExecutor.assertRunningInMainThread();
                coordinator.handleEventFromOperator(subtask, event);
        }
 
        @Override
        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+               mainThreadExecutor.assertRunningInMainThread();
                coordinator.subtaskFailed(subtask, reason);
+               eventValve.resetForTask(subtask);
        }
 
        @Override
-       public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
-               return coordinator.checkpointCoordinator(checkpointId);
+       public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
+               // unfortunately, this method does not run in the scheduler 
executor, but in the
+               // checkpoint coordinator time thread.
+               // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+               // main thread executor
+               mainThreadExecutor.execute(() -> 
checkpointCoordinatorInternal(checkpointId, result));
        }
 
        @Override
        public void checkpointComplete(long checkpointId) {
-               coordinator.checkpointComplete(checkpointId);
+               // unfortunately, this method does not run in the scheduler 
executor, but in the
+               // checkpoint coordinator time thread.
+               // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+               // main thread executor
+               mainThreadExecutor.execute(() -> 
checkpointCompleteInternal(checkpointId));
        }
 
        @Override
        public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+               // ideally we would like to check this here, however this 
method is called early during
+               // execution graph construction, before the main thread 
executor is set
+
+               eventValve.reset();

Review comment:
       That is true. The events here yield a different exception in their 
result future.
   
   I would like to simplify that at some point, get rid of the result future 
from `sendEvent()` and simply handle this all via `taskFailed()` notifications. 
But that requires some more complex versioning, meaning integration with 
checkpointId and `ExecutionAttemptID`.
   
   That would be a good future simplification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to