lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1270207209


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -164,6 +167,13 @@ public void lazyInitialize(
         context.lazyInitialize(
                 globalFailureHandler, mainThreadExecutor, 
operatorCoordinatorMetricGroup);
 
+        if (coordinator instanceof RecreateOnResetOperatorCoordinator) {

Review Comment:
   Instead of passing `checkpointCoordinator` to `SourceCoordinatorContext` 
like this, would it be simpler to do the following:
   
   - `OperatorCoordinatorHolder#lazyInitialize` can pass checkpoint coordinator 
to `context.lazyInitialize`.
   - Add method `getCheckpointCoordinator` to `OperatorCoordinator#Context`, 
which can return the checkpoint coordinator obtained in the above step.
   - `SourceCoordinatorContext#setIsProcessingBacklog` can use the checkpoint 
coordinator obtained from `operatorCoordinatorContext`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to