Hi,

  So there are a couple of things here based on whether the stages are
DETERMINATE or INDETERMINATE.
The exit I added to my example was to trigger some of these cases, and you
can come up with more involved scenarios where this would apply :-)

At a high level, we have the following:

a) If one or more tasks for a stage (and so its shuffle id) is going to be
recomputed, if it is an INDETERMINATE stage, all shuffle output will be
discarded and it will be entirely recomputed (see here
<https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477>
).
b) Conversely, if it was a DETERMINATE stage, only the missing partitions
for that shuffle id will be recomputed.
c) This also impacts whether we can use a task's output if it was for an
earlier stage attempt or not - we can use it for an older attempt if
DETERMINATE, and cant for INDETERMINATE (see here
<https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1852>,
here
<https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1907>
).
(We can skip the more involved cases, should not be relevant for Celeborn
IMO).

For a given partition P, for a spark-shuffle-id, MapOutputTracker state is
updated by DAGScheduler keeping these in mind.
Assuming I did not misunderstand the proposal, if each stage attempt
corresponds to a celeborn-shuffle-id, such that we have spark-shuffle-id ->
List[celeborn-shuffle-id], we will need to take these cases above into
account as well when a "reducer" is requesting for spark-shuffle-id,
partition P output (which celeborn-shuffle-id it maps to).


Regards,
Mridul





On Thu, Oct 12, 2023 at 3:14 AM Erik fang <fme...@gmail.com> wrote:

> Hi Mridul,
>
> sorry for the late reply
>
> Per my understanding, the key point about Spark shuffleId and
> StageId/StageAttemptId is,
> shuffleId is assigned at ShuffleDependency creation time and bounded to
> the RDD/ShuffleDependency, while StageId/StageAttemptId is assigned and
> changes at Job execution time
> In the RDD example, there are two shuffle data with id 0 and 1,  and those
> shuffle data are expected to be accessed(read/write) with the correct
> shuffle id
> and we can do that in celeborn with shuffle id mapping
>
> I made some small modification to the example to avoid exit , and grab
> some logs for Spark DAGScheduler/Celeborn LifecycleManager to help explain
>
> import org.apache.spark.TaskContext
>
> val rdd1 = sc.parallelize(0 until 10000, 20).map(v => (v, v)).groupByKey()
> val rdd2 = rdd1.mapPartitions { iter =>
>   val tc = TaskContext.get()
>   println("print stageAttemptNumber " + tc.stageAttemptNumber())
>   iter
> }
>
> rdd2.count()
> rdd2.map(v => (v._1, v._2)).groupByKey().count()
>
> *// DAGScheduler starts job 0, submit ShuffleMapStage 0 for spark
> shuffle-0*
> 23/10/12 12:58:14 INFO DAGScheduler: Registering RDD 1 (map at <console>:24)
> as input to shuffle 0
> 23/10/12 12:58:14 INFO DAGScheduler: Got job 0 (count at <console>:25)
> with 20 output partitions
> 23/10/12 12:58:14 INFO DAGScheduler: Final stage: ResultStage 1 (count at
> <console>:25)
> 23/10/12 12:58:14 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 0)
> 23/10/12 12:58:14 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
> 0)
> 23/10/12 12:58:14 DEBUG DAGScheduler: submitStage(ResultStage 1
> (name=count at <console>:25;jobs=0))
> 23/10/12 12:58:14 DEBUG DAGScheduler: missing: List(ShuffleMapStage 0)
> 23/10/12 12:58:14 DEBUG DAGScheduler: submitStage(ShuffleMapStage 0
> (name=map at <console>:24;jobs=0))
> 23/10/12 12:58:14 DEBUG DAGScheduler: missing: List()
> 23/10/12 12:58:14 INFO DAGScheduler: Submitting ShuffleMapStage 0
> (MapPartitionsRDD[1] at map at <console>:24), which has no missing parents
> 23/10/12 12:58:14 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0)
>
> *// LifecycleManager received GetShuffleId request from ShuffleMapStage 0
> with spark_shuffleId 0, stage attemptId 0, and generate celeborn_shuffleId
> 0 for write*
> 23/10/12 12:58:16 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter true
> 23/10/12 12:58:16 DEBUG LifecycleManager: Received RegisterShuffle
> request, 0, 20, 20.
> 23/10/12 12:58:16 INFO LifecycleManager: New shuffle request, shuffleId 0,
> partitionType: REDUCE numMappers: 20, numReducers: 20.
>
> *// ShuffleMapStage finish, Submit ResultStage 1*
> 23/10/12 12:58:18 INFO LifecycleManager: Received StageEnd request,
> shuffleId 0.
> 23/10/12 12:58:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 23/10/12 12:58:18 INFO DAGScheduler: ShuffleMapStage 0 (map at <console>:24)
> finished in 4.246 s
> 23/10/12 12:58:18 INFO DAGScheduler: looking for newly runnable stages
> 23/10/12 12:58:18 INFO DAGScheduler: running: Set()
> 23/10/12 12:58:18 INFO DAGScheduler: waiting: Set(ResultStage 1)
> 23/10/12 12:58:18 INFO DAGScheduler: failed: Set()
> 23/10/12 12:58:18 DEBUG DAGScheduler: submitStage(ResultStage 1
> (name=count at <console>:25;jobs=0))
> 23/10/12 12:58:18 DEBUG DAGScheduler: missing: List()
> 23/10/12 12:58:18 INFO DAGScheduler: Submitting ResultStage 1
> (MapPartitionsRDD[3] at mapPartitions at <console>:24), which has no
> missing parents
> 23/10/12 12:58:18 DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
>
> *// LifecycleManager received GetShuffleId request from ResultStage 1 with
> spark_shuffleId 0, stage attemptId 0*
> *// and generate celeborn_shuffleId 0 to return shuffle metadata with
> celeborn's GetShuffleFileGroup for read*
> 23/10/12 12:58:19 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter false
> 23/10/12 12:58:19 DEBUG CelebornShuffleReader: read shuffleId 0 for
> appShuffleId 0 attemptNum 0
> 23/10/12 12:58:19 DEBUG LifecycleManager: Received GetShuffleFileGroup
> request,shuffleId 0.
>
> 23/10/12 12:58:20 INFO DAGScheduler: ResultStage 1 (count at <console>:25)
> finished in 1.546 s
> 23/10/12 12:58:20 INFO DAGScheduler: Job 0 is finished. Cancelling
> potential speculative or zombie tasks for this job
>
> *// DAGScheduler starts job 1, submit ShuffleMapStage 3 for spark
> shuffle-1*
> 23/10/12 12:58:22 INFO DAGScheduler: Registering RDD 4 (map at <console>:25)
> as input to shuffle 1
> 23/10/12 12:58:22 INFO DAGScheduler: Got job 1 (count at <console>:25)
> with 20 output partitions
> 23/10/12 12:58:22 INFO DAGScheduler: Final stage: ResultStage 4 (count at
> <console>:25)
> 23/10/12 12:58:22 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 3)
> 23/10/12 12:58:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
> 3)
> 23/10/12 12:58:22 DEBUG DAGScheduler: submitStage(ResultStage 4
> (name=count at <console>:25;jobs=1))
> 23/10/12 12:58:22 DEBUG DAGScheduler: missing: List(ShuffleMapStage 3)
> 23/10/12 12:58:22 DEBUG DAGScheduler: submitStage(ShuffleMapStage 3
> (name=map at <console>:25;jobs=1))
> 23/10/12 12:58:22 DEBUG DAGScheduler: missing: List()
> 23/10/12 12:58:22 INFO DAGScheduler: Submitting ShuffleMapStage 3
> (MapPartitionsRDD[4] at map at <console>:25), which has no missing parents
> 23/10/12 12:58:22 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 3)
>
> *// LifecycleManager Received GetShuffleId request from ShuffleMapStage 3
> with spark_shuffleId 1, stage attemptId 0, generate celeborn_shuffleId 4
> for write*
> 23/10/12 12:58:22 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 1 maxAttemptNum 4 attemptId 0 isShuffleWriter true
> 23/10/12 12:58:22 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 1 maxAttemptNum 4 attemptId 0 isShuffleWriter true
>
> *// LifecycleManager Received GetShuffleId request from ShuffleMapStage 3
> with spark_shuffleId 0, stage attemptId 0 to read spark shuffle-0*
> 23/10/12 12:58:22 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter false
> 23/10/12 12:58:22 DEBUG LifecycleManager: Received GetShuffleId
> request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter false
> 23/10/12 12:58:22 DEBUG CelebornShuffleReader: read shuffleId 0 for
> appShuffleId 0 attemptNum 0
>
> 23/10/12 12:58:22 DEBUG LifecycleManager: Received RegisterShuffle
> request, 4, 20, 20.
> 23/10/12 12:58:22 INFO LifecycleManager: New shuffle request, shuffleId 4,
> partitionType: REDUCE numMappers: 20, numReducers: 20.
>
> 23/10/12 12:58:24 INFO DAGScheduler: ShuffleMapStage 3 (map at <console>:25)
> finished in 2.359 s
> 23/10/12 12:58:24 INFO DAGScheduler: looking for newly runnable stages
> 23/10/12 12:58:24 INFO DAGScheduler: running: Set()
> 23/10/12 12:58:24 INFO DAGScheduler: waiting: Set(ResultStage 4)
> 23/10/12 12:58:24 INFO DAGScheduler: failed: Set()
> 23/10/12 12:58:24 DEBUG DAGScheduler: submitStage(ResultStage 4
> (name=count at <console>:25;jobs=1))
> 23/10/12 12:58:24 DEBUG DAGScheduler: missing: List()
> 23/10/12 12:58:24 INFO DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[
> 5] at groupByKey at <console>:25), which has no missing parents
>
> we can see that both jobs access the first shuffle data with correct
> shuffle id
>
> However, you comments remind me one missing case here:
> If shuffle 0 failed with first stage attempt, then succeeded with second
> attempt, job 1 can get correct result (read with spark shuffle id 0, stage
> attempt id 1, thus celeborn shuffle id 1)
> For job 2, it gets incorrect celeborn shuffle id when read shuffle 0 for
> the first stage attempts (read with spark shuffle id 0, stage attempt id 0,
> thus celeborn shuffle id 0, but we need celeborn shuffle id 1 here)
> To fix this, LifecycleManager needs to keep the mapping of Spark shuffle
> id -> final success celeborn shuffle id across Stage attempts, and return
> celeborn shuffle id to shuffle reader
>
> Thanks,
> Erik
>
> On Sun, Oct 8, 2023 at 3:12 PM Sungwoo Park <o...@pl.postech.ac.kr> wrote:
>
>> Hello Keyong,
>>
>> We have implemented the previous plan to generate a fresh shufflId when
>> failing to fetch data from Celeborn workers. Thanks for your comment.
>>
>> While testing Celeborn-MR3 with task re-execution (or stage
>> resubmission),
>> I have noticed that shuffleId does not change its associated set of
>> Celeborn workers. Specifically, pushData() for a specific shuffleId always
>> fails if any of its associated Celeborn workers is killed. Our experiment
>> goes as follows:
>>
>> 1. Execute a long-running query.
>> 2. While the query is running (many mappers have succeeded and some
>> reducers are still running), kill 2 Celeborn workers out of 12.
>> 3. A task fails after three attempts with this stack trace:
>>
>> 2023-10-08 05:23:43,814 [DAG-2-23-2] ERROR
>> org.apache.celeborn.client.ShuffleClientImpl [] - Exception raised while
>> pushing data for shuffle 25 map 113 attempt 3 partition 0 batch 1
>> location
>> PartitionLocation[
>>    id-epoch:0-0
>>
>>
>> host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.113-45109-37140-45752-40163
>>    mode:PRIMARY
>>    peer:(empty)
>>    storage hint:StorageInfo{type=MEMORY, mountPoint='/data3',
>> finalResult=false, filePath=}
>>    mapIdBitMap:null].
>> org.apache.celeborn.common.exception.CelebornIOException: Failed to
>> connect to /192.168.10.113:37140
>>    at
>> org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:246)
>>
>> ~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
>>    at
>> org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:190)
>>
>> ~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
>>    at
>> org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:116)
>>
>> ~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
>>    at
>> org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:1079)
>>
>> ~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
>>    at
>> org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:1162)
>>
>> ~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
>>    at
>> org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.doFinalMerge(PipelinedSorter.java:979)
>>
>> ~[tez-runtime-library-0.9.1.mr3.1.0.jar:0.9.1.mr3.1.0]
>>
>> So, does this mean that if pushData() fails with this exception (after
>> some Celeborn workers are lost):
>>
>>      org.apache.celeborn.common.exception.CelebornIOException: Failed to
>> connect to /192.168.10.113:37140
>>
>> we should stop using the current shuffleId and use a fresh shuffleId
>> instead? (I guess the same phenomenon will occur for stage resubmission
>> in
>> Spark.)
>>
>> Or, generating a fresh shuffleId does not help because it may still use
>> those lost Celeborn workers?
>>
>> Thanks,
>>
>> --- Sungwoo
>>
>> On Sat, 7 Oct 2023, Keyong Zhou wrote:
>>
>> > Hi Sungwoo,
>> >
>> > Sorry for the late reply. Reusing a committed shuffleId does not work in
>> > current architecture, even after calling unregisterShuffle in
>> > LifecycleManager,
>> > because the cleanup of metadata is delayed and not guaranteed. It will
>> > be more complicated when we consider graceful restart of workers.
>> >
>> > If we want to reuse the shuffleId, we need to redesign the whole
>> picture.
>> >
>> > Thanks,
>> > Keyong Zhou
>> >
>> > Sungwoo Park <o...@pl.postech.ac.kr> 于2023年10月2日周一 13:23?道:
>> >
>> >> Hi Keyong,
>> >>
>> >> Instead of picking up a new shuffleId, can we reuse an existing
>> shuffleId
>> >> after unregistering it? If the following plan worked, it would further
>> >> simplify the implementation:
>> >>
>> >> 1. Downstream tasks fail because of read failures.
>> >> 2. All active downstream tasks are killed, so the shuffleId is not
>> used.
>> >> 3. An upstream vertex unregisters the shuffleId.
>> >> 4. The upstream vertex is re-executed normally. This re-execution
>> >> automatically registers the same shuffleId.
>> >>
>> >> In summary, we would like to go back in time before the upstream vertex
>> >> started by cleaning up the shuffleId. Could you please give a comment
>> on
>> >> this plan?
>> >>
>> >> Thank you,
>> >>
>> >> --- Sungwoo
>> >>
>> >> On Sat, 30 Sep 2023, Keyong Zhou wrote:
>> >>
>> >>> Hi Sungwoo,
>> >>>
>> >>> I think your approach works with current architecture of Celeborn,
>> >>> and interpreting IOException when reading as read failure makes
>> >>> sense. Currently only when CommitFiles fails will LifecycleManager
>> >>> announce data lost.
>> >>>
>> >>> Thanks,
>> >>> Keyong Zhou
>> >>>
>> >>> Sungwoo Park <o...@pl.postech.ac.kr> 于2023年9月29日周五 22:05?道:
>> >>>
>> >>>>> Since the partition split has a good chance to contain data from
>> almost
>> >>>> all
>> >>>>> upstream
>> >>>>> mapper tasks, the cost of re-computing all upstream tasks may have
>> >> little
>> >>>>> difference
>> >>>>> to re-computing the actual mapper tasks in most cases. Of course
>> it's
>> >> not
>> >>>>> always true.
>> >>>>>
>> >>>>> To change from 'complete' to 'incomplete' also needs to refactor
>> >> Worker's
>> >>>>> logic, which
>> >>>>> currently assumes that the succeeded attempts will not be changed
>> after
>> >>>>> final committing
>> >>>>> files.
>> >>>>>
>> >>>>>> a subset of succeeded attempts. In Erik's proposal, the whole
>> upstream
>> >>>>>> stage will be rerun when data lost.
>> >>>>
>> >>>> Thank you for your response --- things are now much clearer.
>> >>>>
>> >>>> From your comments shown above, let me assume that:
>> >>>>
>> >>>> 1. The whole upstream stage is rerun in the case of read failure.
>> >>>>
>> >>>> 2. Currently it is not easy to change the state of a shuffleId from
>> >>>> 'complete' to 'incomplete'.
>> >>>>
>> >>>> Then, for Celeborn-MR3, I would like to experiment with the following
>> >>>> approach:
>> >>>>
>> >>>> 1. If read failures occur for shuffleId #1, we pick up a new
>> shuffleId
>> >>>> #2.
>> >>>>
>> >>>> 2. The upstream stage (or Vertex in the case of MR3) re-executes all
>> >> tasks
>> >>>> again, but writes the output to shuffleId #2.
>> >>>>
>> >>>> 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
>> >>>>
>> >>>> Do you think this approach makes sense under the current
>> architecture of
>> >>>> Celeborn? If this approach is feasible, MR3 only needs to be
>> notified of
>> >>>> read failures due to lost data by Celeborn ShuffleClient. Or, we
>> could
>> >>>> just interpret IOException from Celeborn ShuffleClient as read
>> failures,
>> >>>> in which case we can implement stage recompute without requiring any
>> >>>> extension of Celeborn. (However, it would be great if Celeborn
>> >>>> ShuffleClient could announce lost data explicitly.)
>> >>>>
>> >>>> An industrial user of Hive-MR3-Celeborn is trying hard to save disk
>> >> usage
>> >>>> on Celeborn workers (which use SSDs of limited capacity), so stage
>> >>>> recompute would be a great new feature to them.
>> >>>>
>> >>>> Thank you,
>> >>>>
>> >>>> --- Sungwoo
>> >>>>
>> >>>>
>> >>>>
>> >>>
>> >
>
>

Reply via email to