curcur commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519263840



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph 
graph) {
                } else {
                        graph.setStateBackend(stateBackend);
                        
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+                       
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
                        graph.setScheduleMode(ScheduleMode.EAGER);
+
+                       if 
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+                               checkApproximateLocalRecoveryCompatibility();
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+                       } else {
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+                       }
                }
        }
 
+       private void checkApproximateLocalRecoveryCompatibility() {
+               checkState(
+                       !checkpointConfig.isUnalignedCheckpointsEnabled(),
+                       "Approximate Local Recovery and Unaligned Checkpoint 
can not be used together yet");

Review comment:
       I've tried out different ways to do the check. It is slightly different 
from the unaligned checkpoint check, because the PipelinedRegionScheduler is 
configured in the JobMaster level(hmm, I can kind of guessing why it is a 
job-master level config, but still wondering why...), and approximate local 
recovery is configured at the job level.
   
   The check should happen in the same place where either 
PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is chosen, where the type 
SchedulingStrategyFactory is decided.
   
   1. In JobGraph, the most reasonable place to put the config 
"isApproximateLocalRecoveryEnabled" seems to be 
`JobCheckpointingSettings#CheckpointCoordinatorConfiguration`, similar to 
unaligned checkpoint's config. However `CheckpointCoordinatorConfiguration` as 
its name, is for CheckpointCoordinator and will be serialized to 
CheckpointCoordinator. But in fact, CheckpointCoordinator does not need 
`isApproximateLocalRecoveryEnabled` for anything, and it breaks a lot of tests, 
so, at this point, it is probably not the good place to put.
   
   2. So I put `isApproximateLocalRecoveryEnabled` in a similar place as 
`scheduleMode` in JobGraph. It will be removed together with `scheduleMode` 
later when removing `scheduleMode`. This flag is only used to make sure 
ApproximateLocalRecovery is not used together with 
JobManagerOptions.SCHEDULING_STRATEGY to region
   
   3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy 
is enforced in `StreamGraphGenerator#configureStreamGraph`
   
   ```
   } else {
        graph.setStateBackend(stateBackend);
        graph.setScheduleMode(ScheduleMode.EAGER);
   
        if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
                checkApproximateLocalRecoveryCompatibility();
                
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
        } else {
                
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
        }
   }
   ```
   
   
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to