lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1270193281
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java:
##########
@@ -74,10 +75,14 @@ private static Map<OperatorID, OperatorCoordinatorHolder>
createCoordinatorMap(
@Override
public void initializeOperatorCoordinators(
ComponentMainThreadExecutor mainThreadExecutor,
- JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+ JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ CheckpointCoordinator checkpointCoordinator) {
for (OperatorCoordinatorHolder coordinatorHolder :
coordinatorMap.values()) {
coordinatorHolder.lazyInitialize(
- globalFailureHandler, mainThreadExecutor,
jobManagerJobMetricGroup);
+ globalFailureHandler,
+ mainThreadExecutor,
+ jobManagerJobMetricGroup,
+ checkpointCoordinator);
Review Comment:
Would it be simpler to use `this.executionGraph.getCheckpointCoordinator()`
without having the caller pass `checkpointCoordinator` as parameter?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -164,6 +167,13 @@ public void lazyInitialize(
context.lazyInitialize(
globalFailureHandler, mainThreadExecutor,
operatorCoordinatorMetricGroup);
+ if (coordinator instanceof RecreateOnResetOperatorCoordinator) {
Review Comment:
Instead of passing checkpoint coordinator like this, would it be simpler to
do the following:
- `OperatorCoordinatorHolder#lazyInitialize` can pass checkpoint coordinator
to `context.lazyInitialize`.
- Add method `getCheckpointCoordinator` to `OperatorCoordinator#Context`,
which can return the checkpoint coordinator obtained in the above step.
- `SourceCoordinatorContext#setIsProcessingBacklog` can use the checkpoint
coordinator obtained from `operatorCoordinatorContext`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]