RexXiong commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1697421800


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));
+    return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+  }
+

Review Comment:
   Agree with @waitinfuture . In my opinion, not only the barrier stage but 
also the deterministic stage might lead to data loss if Celeborn doesn't ensure 
that an attempt task pushes exactly the same data batch.
   
   For a deterministic stage, if the current stage throws a fetch failure 
exception and the parent stage regenerates the data, this stage will resubmit 
the failed task with a different stage attempt number but with the same mapId 
and attemptId. Due to the logic in the LifecycleManager, it will reuse the same 
Celeborn shuffleId and push data to the same Celeborn partition location. This 
can cause incorrect data in Celeborn.
   
   >               val shuffleId: Integer = if (determinate && 
candidateShuffle.isDefined) {
   >                   val id = candidateShuffle.get._1
   >                   logInfo(s"reuse existing shuffleId $id for appShuffleId 
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
   >                  id
   >                } else {
   >                  val newShuffleId = shuffleIdGenerator.getAndIncrement()
   >                  logInfo(s"generate new shuffleId $newShuffleId for 
appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
   >                  shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
   >                  newShuffleId
   >                }
   
   For example:
   
   At Stage 1, Stage Attempt 0: use shuffleId 0 generate data (a1, b1, c1, d1)
   At Stage 2, Stage Attempt 0: use shuffleId 1 and read partial data from 
shuffle 0, and push batch0(a1), batch1(b1) to p1, throw fetch failure and 
disable shuffle 0 
   At Stage 1, Stage Attempt 1: use new shuffleId 2 generate data (a1, b1, c1, 
d1)
   At Stage 2, Stage Attempt 1: reuse shuffle 1, and push new batch0(a1, b1), 
batch1(c1, d1) to same p1, currently celeborn may push those data in different 
batch in two situation:
   - when upstream has multiple partition splits, celeborn will random read 
those splits, then celeborn would push different batch in two retry.
   - if celeborn support spill in future, then batch in celeborn is also 
different between two retry.
   
   This results in p1 containing map0, attempt0, [batch0(a1), batch1(b1), 
batch0(a1, b1), batch1(c1, d1)]
   At Stage 3, read shuffle 1's data and deduplicate by map0, attempt0 + 
batchId , then the result will be batch0(a1), batch1(b1), 
   this cause data lost(c1, d1)
   
   I reproduced the problem using the following case. I made some modifications 
to the Celeborn code to throw an exception when data is fetched. Additionally, 
I implemented random pushing in the SortBasedShuffleWriter:
   
   > val df = sparkSession.createDataFrame(nums, schema)
   > val value = 
df.groupBy("column1").agg(sum("column2").as("sum")).select("column1", "sum")
   > Assert.assertEquals(value.count(), 300000)`
   
   The root cause of the issue is that Celeborn currently uses a combination of 
(mapId, attemptId, batchId) to deduplicate data. However, when a fetch failure 
occurs, Spark may resubmit the failed tasks with the same stageId but a 
different stage attempt number, while retaining the same (mapId, 
attemptNumber). This leads to a scenario where Celeborn cannot distinguish 
between the results of the two map attempts. If Celeborn does not ensure the 
shuffle data order for the retried map tasks, the data might become incorrect.
   
   To address this, we need a mechanism to differentiate between the two 
attempt map tasks, even though Spark uses the same mapId and attempt number. By 
encoding the stage attempt number with the map attempt number, as  @jiang13021 
, we can create a new, unique attempt number for each task. This will allow 
Celeborn to correctly identify and read only the new successful attempts, 
ensuring the accuracy of the results.  
   
   If needed, I will organize the code and unit tests I have modified, and we 
can test them then. 
   
   @jiang13021 @waitinfuture @mridulm 
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to