SteNicholas commented on code in PR #2716:
URL: https://github.com/apache/celeborn/pull/2716#discussion_r1742527100
##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java:
##########
@@ -193,8 +206,30 @@ public CelebornBufferStream readBufferedPartition(
}
}
+ /**
+ * Update the reduce file groups and obtain the PartitionLocations of the
target
+ * shuffleId#partitionId. It is possible to return an empty array if the
corresponding reduce file
+ * groups are nonexistent, a scenario likely arising when downstream reduce
tasks are start early
+ * than upstream map tasks, e.g. Flink Hybrid Shuffle.
+ */
+ public PartitionLocation[] updateFileGroupAndGetLocations(
+ int shuffleId, int partitionId, boolean isSegmentGranularityVisible)
throws IOException {
+ ReduceFileGroups fileGroups =
+ updateFileGroup(shuffleId, partitionId, isSegmentGranularityVisible);
+ if (fileGroups.partitionGroups == null
+ || fileGroups.partitionGroups.isEmpty()
+ || !fileGroups.partitionGroups.containsKey(partitionId)) {
+ return new PartitionLocation[0];
+ } else {
+ PartitionLocation[] partitionLocations =
+ fileGroups.partitionGroups.get(partitionId).toArray(new
PartitionLocation[0]);
+ return partitionLocations;
Review Comment:
```suggestion
return fileGroups.partitionGroups.get(partitionId).toArray(new
PartitionLocation[0]);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]