mridulm commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1677177186


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));
+    return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+  }
+

Review Comment:
   (Initially I responded to the un-edited comment from github notification - 
but reformulating my response once I read the full comment @waitinfuture  :-) )
   
   A few notes about the test specifically.
   * Note that the failing task does not push any data in the first attempt - 
only the succeeding tasks do. But given it is a barrier stage, all tasks are 
reexecuted when even one of them fails.
     * Note, this is a quirk of the test - we could have had data being pushed 
as well ! Just detailing the scenario
   * Even though spark is inferring the stage as determinate, and Celeborn is 
relying on the stage being determinate - it is not.
     * It is order sensitive (see the `reverseIterator` on reexecution).  An 
order sensitive stage, which is reading shuffle data will get inferred as 
indeterminate by spark - if the RDD had exposed the right deterministic level.
     * I mention this only to illustrate that nuance here - we should support 
barrier stages properly even if that was not the case.
   * In general, barrier stages are 'different' from regular spark stages due 
to their all-or-nothing approach. We should treat them as though entire shuffle 
output (for the barrier stage) has been lost - since it will be entirely 
regenerated.
   
   Given this, let me respond to the queries above:
   
   > For these two batches, the failed task and the re-executed task have the 
same (mapId, attemptId, batchId), Celeborn client will treat them identical and 
will filter out either, for example it can filter out re-executed tasks' Batch0 
and Batch1, and consume the failed tasks' Batch0 and Batch1.
   
   The stage attempt-id is actually different - but given it is a determinate 
stage, we reuse the existing shuffle id associated with the previous stage 
attempt.
   Marking the RDD, for the barrier stage, as `UNORDERED` should also 'fix' the 
test case when `throwsFetchFailure` is enabled IMO - though I did not test this 
!
   
   > so the correct behavior should be that the failed task's Batch0 and Batch1 
be filtered out.
   
   A small nit - the failed task did not generate any shuffle output here, 
while the successful tasks did - for first attempt; the semantics of barrier 
scheduling results in reexecution of the successful tasks.
   But leaving that aside - I agree with the expectation: when `data is being 
entirely re-generated for a given partition due to task or stage failure, the 
'previous' output for that partition should not be used`.
   This covers partial output from failed tasks in 'normal' stage, 
indeterminate 'normal' stage, as well as barrier stages.
   
   > Currently Celeborn client uses context.attemptNumber() as attemptId to 
distinguish different task attempts. When throwsFetchFailure is disabled, this 
works fine because the re-executed task will not have the same attemptNumber
   
   There are too many issues when `throwsFetchFailure` is disabled.
   IMO we should move away from that flag being disabled in subsequent releases 
and retire that code path entirely afterwards.
   
   >This PR encodes stageAttemptId and attemptNumber together, making attemptId 
unique across rerun stages, so IMO it's necessary. WDYT? 
   
   As I showed in the prototype PR, this is not only necessary - I am not 
completely sure if it might be breaking `throwsFetchFailure` case for 
determinate stage as well.
   This is a case of barrier stage semantics not being handled by Celeborn 
properly; and once we support it, the issue should get fixed.
   IMO we should revisit change to attemptId after we support barrier stage 
correctly - and evaluate if it is still required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to