gaoyunhaii commented on code in PR #20862:
URL: https://github.com/apache/flink/pull/20862#discussion_r981900118


##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -287,35 +305,49 @@ public static TaskDeploymentDescriptorFactory 
fromExecution(Execution execution)
                 clusterPartitionShuffleDescriptors);
     }
 
-    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+    private static Map<IntermediateDataSetID, 
CompletableFuture<ShuffleDescriptor[]>>
             getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
         final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
         final List<IntermediateDataSetID> consumedClusterDataSetIds =
                 
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
-        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors =
-                new HashMap<>();
+        Map<IntermediateDataSetID, CompletableFuture<ShuffleDescriptor[]>>
+                clusterPartitionShuffleDescriptors = new HashMap<>();
 
         for (IntermediateDataSetID consumedClusterDataSetId : 
consumedClusterDataSetIds) {
-            List<? extends ShuffleDescriptor> shuffleDescriptors =
+            CompletableFuture<List<ShuffleDescriptor>> 
shuffleDescriptorsFuture =
                     
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
                             consumedClusterDataSetId);
 
-            // For FLIP-205, the job graph generating side makes sure that the 
producer and consumer
-            // of the cluster partition have the same parallelism and each 
consumer Task consumes
-            // one output partition of the producer.
-            Preconditions.checkState(
-                    executionVertex.getTotalNumberOfParallelSubtasks() == 
shuffleDescriptors.size(),
-                    "The parallelism (%s) of the cache consuming job vertex is 
"
-                            + "different from the number of shuffle 
descriptors (%s) of the intermediate data set",
-                    executionVertex.getTotalNumberOfParallelSubtasks(),
-                    shuffleDescriptors.size());
-
-            clusterPartitionShuffleDescriptors.put(
-                    consumedClusterDataSetId,
-                    new ShuffleDescriptor[] {
-                        
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
-                    });
+            final CompletableFuture<ShuffleDescriptor[]> completableFuture =
+                    shuffleDescriptorsFuture.thenApply(
+                            shuffleDescriptors -> {
+                                try {
+                                    // For FLIP-205, the job graph generating 
side makes sure that
+                                    // the

Review Comment:
   nit: adjust the line-break of comments. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -287,35 +305,49 @@ public static TaskDeploymentDescriptorFactory 
fromExecution(Execution execution)
                 clusterPartitionShuffleDescriptors);
     }
 
-    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+    private static Map<IntermediateDataSetID, 
CompletableFuture<ShuffleDescriptor[]>>
             getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
         final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
         final List<IntermediateDataSetID> consumedClusterDataSetIds =
                 
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
-        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors =
-                new HashMap<>();
+        Map<IntermediateDataSetID, CompletableFuture<ShuffleDescriptor[]>>
+                clusterPartitionShuffleDescriptors = new HashMap<>();
 
         for (IntermediateDataSetID consumedClusterDataSetId : 
consumedClusterDataSetIds) {
-            List<? extends ShuffleDescriptor> shuffleDescriptors =
+            CompletableFuture<List<ShuffleDescriptor>> 
shuffleDescriptorsFuture =
                     
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
                             consumedClusterDataSetId);
 
-            // For FLIP-205, the job graph generating side makes sure that the 
producer and consumer
-            // of the cluster partition have the same parallelism and each 
consumer Task consumes
-            // one output partition of the producer.
-            Preconditions.checkState(
-                    executionVertex.getTotalNumberOfParallelSubtasks() == 
shuffleDescriptors.size(),
-                    "The parallelism (%s) of the cache consuming job vertex is 
"
-                            + "different from the number of shuffle 
descriptors (%s) of the intermediate data set",
-                    executionVertex.getTotalNumberOfParallelSubtasks(),
-                    shuffleDescriptors.size());
-
-            clusterPartitionShuffleDescriptors.put(
-                    consumedClusterDataSetId,
-                    new ShuffleDescriptor[] {
-                        
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
-                    });
+            final CompletableFuture<ShuffleDescriptor[]> completableFuture =
+                    shuffleDescriptorsFuture.thenApply(

Review Comment:
   Here should be a bit problematic: 
   
   Since the shuffleDescriptorsFuture is the result of calling an external rpc 
gateway, it is not guarantee to be submitted back to the main thread executor, 
namely there might be cases the JobMaster actor is scheduler while the future 
is completed with another thread, which causes unexpected contentions. 
   
   Here the first handler needs to be submitted explicitly to
   
`execution.getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor()`
   
   Or another way might be in `JobMasterPartitionTrackerImpl` we submit the 
future explicitly back to the main thread executor in 
`JobMasterPartitionTrackerImpl` to not leaking out the requirements. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to