gaoyunhaii commented on code in PR #20862:
URL: https://github.com/apache/flink/pull/20862#discussion_r981900118
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -287,35 +305,49 @@ public static TaskDeploymentDescriptorFactory
fromExecution(Execution execution)
clusterPartitionShuffleDescriptors);
}
- private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ private static Map<IntermediateDataSetID,
CompletableFuture<ShuffleDescriptor[]>>
getClusterPartitionShuffleDescriptors(ExecutionVertex
executionVertex) {
final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
final List<IntermediateDataSetID> consumedClusterDataSetIds =
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
- Map<IntermediateDataSetID, ShuffleDescriptor[]>
clusterPartitionShuffleDescriptors =
- new HashMap<>();
+ Map<IntermediateDataSetID, CompletableFuture<ShuffleDescriptor[]>>
+ clusterPartitionShuffleDescriptors = new HashMap<>();
for (IntermediateDataSetID consumedClusterDataSetId :
consumedClusterDataSetIds) {
- List<? extends ShuffleDescriptor> shuffleDescriptors =
+ CompletableFuture<List<ShuffleDescriptor>>
shuffleDescriptorsFuture =
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
consumedClusterDataSetId);
- // For FLIP-205, the job graph generating side makes sure that the
producer and consumer
- // of the cluster partition have the same parallelism and each
consumer Task consumes
- // one output partition of the producer.
- Preconditions.checkState(
- executionVertex.getTotalNumberOfParallelSubtasks() ==
shuffleDescriptors.size(),
- "The parallelism (%s) of the cache consuming job vertex is
"
- + "different from the number of shuffle
descriptors (%s) of the intermediate data set",
- executionVertex.getTotalNumberOfParallelSubtasks(),
- shuffleDescriptors.size());
-
- clusterPartitionShuffleDescriptors.put(
- consumedClusterDataSetId,
- new ShuffleDescriptor[] {
-
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
- });
+ final CompletableFuture<ShuffleDescriptor[]> completableFuture =
+ shuffleDescriptorsFuture.thenApply(
+ shuffleDescriptors -> {
+ try {
+ // For FLIP-205, the job graph generating
side makes sure that
+ // the
Review Comment:
nit: adjust the line-break of comments.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -287,35 +305,49 @@ public static TaskDeploymentDescriptorFactory
fromExecution(Execution execution)
clusterPartitionShuffleDescriptors);
}
- private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ private static Map<IntermediateDataSetID,
CompletableFuture<ShuffleDescriptor[]>>
getClusterPartitionShuffleDescriptors(ExecutionVertex
executionVertex) {
final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
final List<IntermediateDataSetID> consumedClusterDataSetIds =
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
- Map<IntermediateDataSetID, ShuffleDescriptor[]>
clusterPartitionShuffleDescriptors =
- new HashMap<>();
+ Map<IntermediateDataSetID, CompletableFuture<ShuffleDescriptor[]>>
+ clusterPartitionShuffleDescriptors = new HashMap<>();
for (IntermediateDataSetID consumedClusterDataSetId :
consumedClusterDataSetIds) {
- List<? extends ShuffleDescriptor> shuffleDescriptors =
+ CompletableFuture<List<ShuffleDescriptor>>
shuffleDescriptorsFuture =
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
consumedClusterDataSetId);
- // For FLIP-205, the job graph generating side makes sure that the
producer and consumer
- // of the cluster partition have the same parallelism and each
consumer Task consumes
- // one output partition of the producer.
- Preconditions.checkState(
- executionVertex.getTotalNumberOfParallelSubtasks() ==
shuffleDescriptors.size(),
- "The parallelism (%s) of the cache consuming job vertex is
"
- + "different from the number of shuffle
descriptors (%s) of the intermediate data set",
- executionVertex.getTotalNumberOfParallelSubtasks(),
- shuffleDescriptors.size());
-
- clusterPartitionShuffleDescriptors.put(
- consumedClusterDataSetId,
- new ShuffleDescriptor[] {
-
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
- });
+ final CompletableFuture<ShuffleDescriptor[]> completableFuture =
+ shuffleDescriptorsFuture.thenApply(
Review Comment:
Here should be a bit problematic:
Since the shuffleDescriptorsFuture is the result of calling an external rpc
gateway, it is not guarantee to be submitted back to the main thread executor,
namely there might be cases the JobMaster actor is scheduler while the future
is completed with another thread, which causes unexpected contentions.
Here the first handler needs to be submitted explicitly to
`execution.getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor()`
Or another way might be in `JobMasterPartitionTrackerImpl` we submit the
future explicitly back to the main thread executor in
`JobMasterPartitionTrackerImpl` to not leaking out the requirements.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]