SteNicholas commented on code in PR #2716:
URL: https://github.com/apache/celeborn/pull/2716#discussion_r1742522872
##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java:
##########
@@ -164,17 +165,29 @@ public void setupLifecycleManagerRef(RpcEndpointRef
endpointRef) {
}
public CelebornBufferStream readBufferedPartition(
- int shuffleId, int partitionId, int subPartitionIndexStart, int
subPartitionIndexEnd)
+ int shuffleId,
+ int partitionId,
+ int subPartitionIndexStart,
+ int subPartitionIndexEnd,
+ boolean isSegmentGranularityVisible)
throws IOException {
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
- ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
- if (fileGroups.partitionGroups.size() == 0
- || !fileGroups.partitionGroups.containsKey(partitionId)) {
- logger.error("Shuffle data is empty for shuffle {} partitionId {}.",
shuffleId, partitionId);
- throw new PartitionUnRetryAbleException(partitionId + " may be lost.");
+ PartitionLocation[] partitionLocations =
+ updateFileGroupAndGetLocations(shuffleId, partitionId,
isSegmentGranularityVisible);
+ if (partitionLocations.length == 0) {
+ logger.error(
+ "Shuffle data is empty for shuffle {} partitionId {}
isSegmentGranularityVisible {}.",
+ shuffleId,
+ partitionId,
+ isSegmentGranularityVisible);
+ if (isSegmentGranularityVisible) {
+ // When the downstream reduce tasks start early than upstream map
tasks, the shuffle
+ // partition locations may be found empty, should retry until the
upstream task started
+ return CelebornBufferStream.empty();
+ } else {
+ throw new PartitionUnRetryAbleException(partitionId + " may be lost.");
Review Comment:
```suggestion
throw new PartitionUnRetryAbleException(String.format("Shuffle data
lost for shuffle %d partition %d.", shuffleId, partitionId));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]