Hi Sungwoo,
Thanks for your mail! For your questions:
1. No, your implementation does not violate the usage of Celeborn-API, and
speculative execution
is supported. Do you call mapperEnd in attempt #2? I think you can kill
attempt #1 after the
invocation of mapperEnd in attempt #2 succeeds.
2. Since speculation execution is allowed, we can safely kill a task
attempt when another attempt
succeeds.
Thanks,
Keyong Zhou
<[email protected]> 于2023年8月19日周六 22:38写道:
> Hello Celeborn team,
>
> We are quite close to completing our Celeborn-MR3 client, and I have a
> question on speculative execution in the context of using Celeborn.
>
> MR3 supports speculative execution which allows several task attempts to
> run concurrently. When a task attempt succeeds, all other concurrent task
> attempts are interrupted and killed, so that only one task attempt commits
> its output.
>
> When using Celeborn-MR3, speculative execution sometimes seems to corrupt
> data sent over to Celeborn. Below I describe a sequence of events that
> produce this error. shuffleId, mapId, and partitionId are all fixed,
> whereas attemptId can be either 0 or 1.
>
> 1. Task attempt #1 (with attemptId 0) starts, and calls
> ShuffleClient.pushData().
>
> 2. Task attempt #1 gets stuck at the call of mapperEnd() because
> ShuffleClient fails to send data to Celeborn for an unknown reason, while
> repeatedly producing INFO messages like:
>
> 2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO
> org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data
> success, new location for shuffle 1005007 map 408 attempt 0 partition 0
> batch 1 is location PartitionLocation[
> id-epoch:0-4
>
> host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.103-39861-45968-46540-44091
> mode:PRIMARY
> peer:(empty)
> storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK',
> finalResult=false, filePath=}
> mapIdBitMap:null].
>
> 3. As task attempt #1 does not return for a long time, the speculative
> execution mechanism of MR3 kicks in and launches another task attempt
> #2 (with attemptId 1).
>
> 4. Task attempt #2 calls pushData() and succeeds. That is, task attempt #2
> successfully pushes data to Celeborn.
>
> 5. MR3 interrupts and kills task attempt #1. When this occurs, mapperEnd()
> gets interrupted and prints a message like the following:
>
> 2023-08-19 11:52:16,089 [DAG-1-5-1] WARN RuntimeTask [] -
> LogicalOutput.close() fails on Reducer 12
> org.apache.celeborn.common.exception.CelebornIOException: sleep
> interrupted
> at
>
> org.apache.celeborn.common.write.InFlightRequestTracker.limitZeroInFlight(InFlightRequestTracker.java:155)
> at
>
> org.apache.celeborn.common.write.PushState.limitZeroInFlight(PushState.java:85)
> at
>
> org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:611)
> at
>
> org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1494)
> at
>
> org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1478)
>
> 6. Now, a consumer task attempt tries to read the data pushed by task
> attempt #2. However, it fails to read the data sent by task attempt #2,
> with the following error:
>
> java.io.IOException: Premature EOF from inputStream
> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:212)
> at
> org.apache.tez.runtime.library.common.shuffle.RssShuffleUtils.shuffleToMemory(RssShuffleUtils.java:47)
>
> Our implementation is quite standard:
>
> inputStream = rssShuffleClient.readPartition(...);
> org.apache.hadoop.io.IOUtils.readFully(inputStream, ..., dataLength);
>
> We double-checked the parameter dataLength and found that it was correctly
> set to the size of data pushed by task attempt #2.
>
> I have two questions:
>
> 1) In the context of using Celeborn, does our implementation violate the
> usage of Celeborn-API? For example, should we prohibit speculative
> execution because Celeborn requires only one task attempt to call
> pushData() at any point of time?
>
> 2) If speculative execution is not allowed, how can we quickly fail
> a task attempt stuck at mapperEnd()? By default, it seems like
> ShufflClient waits for 1200 seconds, not the defaul value of 120 seconds:
>
> 2023-08-19 11:51:32,159 [DAG-1-5-1] ERROR
> org.apache.celeborn.common.write.InFlightRequestTracker [] - After waiting
> for 1200000 ms, there are still 1 batches in flight for hostAndPushPort
> [192.168.10.106:38993], which exceeds the current limit 0.
>
> Thanks a lot,
>
> --- Sungwoo
>
>