curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263840
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph
graph) {
} else {
graph.setStateBackend(stateBackend);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+ if
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+ checkApproximateLocalRecoveryCompatibility();
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+ } else {
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+ }
}
}
+ private void checkApproximateLocalRecoveryCompatibility() {
+ checkState(
+ !checkpointConfig.isUnalignedCheckpointsEnabled(),
+ "Approximate Local Recovery and Unaligned Checkpoint
can not be used together yet");
Review comment:
I've tried out different ways to do the check. It is slightly different
from the unaligned checkpoint check, because the PipelinedRegionScheduler is
configured in the JobMaster level, and approximate local recovery is configured
at the job level.
The check should happen in the same place where either
PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is decided, where the type
SchedulingStrategyFactory is decided.
1. In JobGraph, the most reasonable place to put the config
"isApproximateLocalRecoveryEnabled" seems to be
`JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to
unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as
its name, is for CheckpointCoordinator and will be serialized to
CheckpointCoordinator. But in fact, CheckpointCoordinator does not need
`isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests,
so, at this point, it is probably the best place to put.
2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as
`scheduleMode` in JobGraph, and will be removed together with `scheduleMode`
later. This flag is only used to make sure ApproximateLocalRecovery is not used
together with JobManagerOptions.SCHEDULING_STRATEGY to region
3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy
is enforced in `StreamGraphGenerator#configureStreamGraph`
```
} else {
graph.setStateBackend(stateBackend);
graph.setScheduleMode(ScheduleMode.EAGER);
if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]