zhuzhurk commented on a change in pull request #16314:
URL: https://github.com/apache/flink/pull/16314#discussion_r675440564
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
##########
@@ -123,42 +134,48 @@ public TaskDeploymentDescriptor
createDeploymentDescriptor(
resultId,
partitionType,
queueToRequest,
-
getConsumedPartitionShuffleDescriptors(partitions)));
+ getConsumedPartitionShuffleDescriptors(
+ consumedIntermediateResult,
consumedPartitionGroup)));
}
return inputGates;
}
- private ShuffleDescriptor[] getConsumedPartitionShuffleDescriptors(
- List<IntermediateResultPartition> partitions) {
+ private SerializedValue<ShuffleDescriptor[]>
getConsumedPartitionShuffleDescriptors(
+ IntermediateResult intermediateResult, ConsumedPartitionGroup
consumedPartitionGroup)
+ throws IOException {
+ SerializedValue<ShuffleDescriptor[]> serializedShuffleDescriptors =
+
intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
+ if (serializedShuffleDescriptors == null) {
+ serializedShuffleDescriptors =
+
computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup);
+ intermediateResult.cacheShuffleDescriptors(
+ consumedPartitionGroup, serializedShuffleDescriptors);
+ }
+ return serializedShuffleDescriptors;
+ }
- ShuffleDescriptor[] shuffleDescriptors = new
ShuffleDescriptor[partitions.size()];
+ private SerializedValue<ShuffleDescriptor[]>
computeConsumedPartitionShuffleDescriptors(
+ ConsumedPartitionGroup consumedPartitionGroup) throws IOException {
+
+ ShuffleDescriptor[] shuffleDescriptors =
+ new ShuffleDescriptor[consumedPartitionGroup.size()];
// Each edge is connected to a different result partition
- for (int i = 0; i < partitions.size(); i++) {
- shuffleDescriptors[i] =
+ int i = 0;
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ shuffleDescriptors[i++] =
getConsumedPartitionShuffleDescriptor(
Review comment:
Let's add some documents for the `UNKNOWN` `ShuffleDescriptor` code path
to explain why we can cache it and do not need to update it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]