curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263840
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph
graph) {
} else {
graph.setStateBackend(stateBackend);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+ if
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+ checkApproximateLocalRecoveryCompatibility();
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+ } else {
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+ }
}
}
+ private void checkApproximateLocalRecoveryCompatibility() {
+ checkState(
+ !checkpointConfig.isUnalignedCheckpointsEnabled(),
+ "Approximate Local Recovery and Unaligned Checkpoint
can not be used together yet");
Review comment:
I've tried out different ways to do the check. It is slightly different
from the unaligned checkpoint check, because the PipelinedRegionScheduler is
configured in the JobMaster level, and approximate local recovery is configured
at the job level. (BTW, I can kind of guessing the reason why it is a
job-master level config, but still curious why because I probably would have
the same question later when I am constructing a separate failover strategy. We
can probably discuss then).
The check should happen in the same place where either
PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is chosen, where the type
SchedulingStrategyFactory is decided.
1. In JobGraph, the most reasonable place to put the config
"isApproximateLocalRecoveryEnabled" seems to be
`JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to
unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as
its name, is for CheckpointCoordinator and will be serialized to
CheckpointCoordinator. But in fact, CheckpointCoordinator does not need
`isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests,
so, at this point, it is probably not the good place to put.
2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as
`scheduleMode` in JobGraph. It will be removed together with `scheduleMode`
later when removing `scheduleMode`. This flag is only used to make sure
ApproximateLocalRecovery is not used together with
JobManagerOptions.SCHEDULING_STRATEGY to region
3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy
is enforced in `StreamGraphGenerator#configureStreamGraph`, so we do not need a
check in LEGACY_SCHEDULING case, because it is enforced to be `EAGER`.
```
} else {
graph.setStateBackend(stateBackend);
graph.setScheduleMode(ScheduleMode.EAGER);
if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]