SteNicholas commented on code in PR #2716:
URL: https://github.com/apache/celeborn/pull/2716#discussion_r1742522872


##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java:
##########
@@ -164,17 +165,29 @@ public void setupLifecycleManagerRef(RpcEndpointRef 
endpointRef) {
   }
 
   public CelebornBufferStream readBufferedPartition(
-      int shuffleId, int partitionId, int subPartitionIndexStart, int 
subPartitionIndexEnd)
+      int shuffleId,
+      int partitionId,
+      int subPartitionIndexStart,
+      int subPartitionIndexEnd,
+      boolean isSegmentGranularityVisible)
       throws IOException {
     String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
-    ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
-    if (fileGroups.partitionGroups.size() == 0
-        || !fileGroups.partitionGroups.containsKey(partitionId)) {
-      logger.error("Shuffle data is empty for shuffle {} partitionId {}.", 
shuffleId, partitionId);
-      throw new PartitionUnRetryAbleException(partitionId + " may be lost.");
+    PartitionLocation[] partitionLocations =
+        updateFileGroupAndGetLocations(shuffleId, partitionId, 
isSegmentGranularityVisible);
+    if (partitionLocations.length == 0) {
+      logger.error(
+          "Shuffle data is empty for shuffle {} partitionId {} 
isSegmentGranularityVisible {}.",
+          shuffleId,
+          partitionId,
+          isSegmentGranularityVisible);
+      if (isSegmentGranularityVisible) {
+        // When the downstream reduce tasks start early than upstream map 
tasks, the shuffle
+        // partition locations may be found empty, should retry until the 
upstream task started
+        return CelebornBufferStream.empty();
+      } else {
+        throw new PartitionUnRetryAbleException(partitionId + " may be lost.");

Review Comment:
   ```suggestion
           throw new PartitionUnRetryAbleException(String.format("Shuffle data 
lost for shuffle %d partition %d.", shuffleId, partitionId));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to