RexXiong commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1697421800
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
Agree with @waitinfuture . In my opinion, not only the barrier stage but
also the deterministic stage might lead to data loss if Celeborn doesn't ensure
that an attempt task pushes exactly the same data batch.
For a deterministic stage, if the current stage throws a fetch failure
exception and the parent stage regenerates the data, this stage will resubmit
the failed task with a different stage attempt number but with the same mapId
and attemptId. Due to the logic in the LifecycleManager, it will reuse the same
Celeborn shuffleId and push data to the same Celeborn partition location. This
can cause incorrect data to be stored in Celeborn.
> val shuffleId: Integer = if (determinate &&
candidateShuffle.isDefined) {
> val id = candidateShuffle.get._1
> logInfo(s"reuse existing shuffleId $id for appShuffleId
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
> id
> } else {
> val newShuffleId = shuffleIdGenerator.getAndIncrement()
> logInfo(s"generate new shuffleId $newShuffleId for
appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
> shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
> newShuffleId
> }
For example:
At Stage 1, Stage Attempt 0: use shuffleId 0 generate data (a1, b1, c1, d1)
At Stage 2, Stage Attempt 0: use shuffleId 1 and read partial data from
shuffle 0, and push batch0(a1), batch1(b1) to p1, throw fetch failure and
disable shuffle 0
At Stage 1, Stage Attempt 1: use new shuffleId 2 generate data (a1, b1, c1,
d1)
At Stage 2, Stage Attempt 1: reuse shuffle 1, and push new batch0(a1, b1),
batch1(c1, d1) to same p1, currently celeborn may push those data in different
batch in two situation:
- when upstream has multiple partition splits, celeborn will random read
those splits, then celeborn would push different batch in two retry.
- if celeborn support spill in future, then batch in celeborn is also
different between two retry.
This results in p1 containing map0, attempt0, [batch0(a1), batch1(b1),
batch0(a1, b1), batch1(c1, d1)]
At Stage 3, read shuffle 1's data and deduplicate by map0, attempt0 +
batchId , then the result will be batch0(a1), batch1(b1),
this cause data lost(c1, d1)
I reproduced the problem using the following case. I made some modifications
to the Celeborn code to throw an exception when data is fetched. Additionally,
I implemented random pushing in the SortBasedShuffleWriter:
> val df = sparkSession.createDataFrame(nums, schema)
> val value =
df.groupBy("column1").agg(sum("column2").as("sum")).select("column1", "sum")
> Assert.assertEquals(value.count(), 300000)`
The root cause of the issue is that Celeborn currently uses a combination of
(mapId, attemptId, batchId) to deduplicate data. However, when a fetch failure
occurs, Spark may resubmit the failed tasks with the same stageId but a
different stage attempt number, while retaining the same (mapId,
attemptNumber). This leads to a scenario where Celeborn cannot distinguish
between the results of the two map attempts. If Celeborn does not ensure the
shuffle data order for the retried map tasks, the data might become incorrect.
To address this, we need a mechanism to differentiate between the two
attempt map tasks, even though Spark uses the same mapId and attempt number. By
encoding the stage attempt number with the map attempt number, as @jiang13021
, we can create a new, unique attempt number for each task. This will allow
Celeborn to correctly identify and read only the new successful attempts,
ensuring the accuracy of the results.
If needed, I will organize the code and unit tests I have modified, and we
can test them then.
@jiang13021 @waitinfuture @mridulm
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]