mridulm commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1677177186
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
(Initially I responded to the un-edited comment from github notification -
but reformulating my response once I read the full comment @waitinfuture :-) )
A few notes about the test specifically.
* Note that the failing task does not push any data in the first attempt -
only the succeeding tasks do. But given it is a barrier stage, all tasks are
reexecuted when even one of them fails.
* Even though spark is inferring the stage as determinate, and Celeborn is
relying on the stage being determinate - it is not.
* It is order sensitive (see the `reverseIterator` on reexecution). An
order sensitive stage, which is reading shuffle data will get inferred as
indeterminate by spark - if the RDD had exposed the right deterministic level.
* Having said that, we should fix this issue even in that case anyway IMO.
* In general, barrier stages are 'different' from regular spark stages due
to their all-or-nothing approach. We should treat them as though entire shuffle
output (for the barrier stage) has been lost - since it will be entirely
regenerated.
Given this, let me respond to the queries above:
> For these two batches, the failed task and the re-executed task have the
same (mapId, attemptId, batchId), Celeborn client will treat them identical and
will filter out either, for example it can filter out re-executed tasks' Batch0
and Batch1, and consume the failed tasks' Batch0 and Batch1.
The stage attempt-id is actually different - but given it is a determinate
stage, we reuse the existing shuffle id associated with the previous stage
attempt.
Marking the RDD, for the barrier stage, as `UNORDERED` should also 'fix' the
issue when `throwsFetchFailure` is enabled IMO - though I did not test this !
> so the correct behavior should be that the failed task's Batch0 and Batch1
be filtered out.
A small nit - the failed task did not generate any shuffle output here,
while the successful tasks did - for first attempt; the semantics of barrier
scheduling results in reexecution of the successful tasks.
But leaving that aside - I agree with the expectation: when `data is being
entirely re-generated for a given partition due to task or stage failure, the
'previous' output for that partition should not be used`.
This covers partial output from failed tasks in 'normal' stage,
indeterminate 'normal' stage, as well as barrier stages.
> Currently Celeborn client uses context.attemptNumber() as attemptId to
distinguish different task attempts. When throwsFetchFailure is disabled, this
works fine because the re-executed task will not have the same attemptNumber
There are too many issues when `throwsFetchFailure` is disabled.
IMO we should move away from that flag being disabled in subsequent releases
and retire that code path entirely afterwards.
>This PR encodes stageAttemptId and attemptNumber together, making attemptId
unique across rerun stages, so IMO it's necessary. WDYT?
As I showed in the prototype PR, this is not only unnecessary, but also
breaks `throwsFetchFailure` case.
IMO this is a case of barrier stage semantics not being handled by Celeborn
properly; and once we support it, the issue goes away.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]