Hi Sungwoo,

Is there any other Exceptions when 'Premature EOF from inputStream' occurs?
Could you send the log file
of the reduce task?

Thanks,
Keyong Zhou

Sungwoo Park <[email protected]> 于2023年8月21日周一 12:32写道:

> Hi Keyong.
>
> Thanks for your reply. We call mapperEnd() in attempt #2 (which is
> followed by a call to ShuffleClient.cleanup()). Also, attempt #1 is killed
> after attempt #2 is finished.
>
> It looks like 'Premature EOF from inputStream' error occurs after a
> taskattempt is interrupted while it keeps printing error messages like:
>
> 2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO
> org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data
> success, new location for shuffle 1005007 map 408 attempt 0 partition 0
> batch 1 is location PartitionLocation[ ... ].
>
> Do you have any comments about this? We call Celeborn-API in a standard
> way (using only pushData(), mapperEnd(), cleanup(), etc).
>
> Thanks,
>
> --- Sungwoo
>
>
> On Mon, Aug 21, 2023 at 12:18 PM Keyong Zhou <[email protected]> wrote:
>
>> Hi Sungwoo,
>>
>> Thanks for your mail! For your questions:
>>
>> 1. No, your implementation does not violate the usage of Celeborn-API,
>> and speculative execution
>>     is supported. Do you call mapperEnd in attempt #2? I think you can
>> kill attempt #1 after the
>>     invocation of mapperEnd in attempt #2 succeeds.
>>
>> 2. Since speculation execution is allowed, we can safely kill a task
>> attempt when another attempt
>>     succeeds.
>>
>> Thanks,
>> Keyong Zhou
>>
>> <[email protected]> 于2023年8月19日周六 22:38写道:
>>
>>> Hello Celeborn team,
>>>
>>> We are quite close to completing our Celeborn-MR3 client, and I have a
>>> question on speculative execution in the context of using Celeborn.
>>>
>>> MR3 supports speculative execution which allows several task attempts to
>>> run concurrently. When a task attempt succeeds, all other concurrent task
>>> attempts are interrupted and killed, so that only one task attempt
>>> commits
>>> its output.
>>>
>>> When using Celeborn-MR3, speculative execution sometimes seems to corrupt
>>> data sent over to Celeborn. Below I describe a sequence of events that
>>> produce this error. shuffleId, mapId, and partitionId are all fixed,
>>> whereas attemptId can be either 0 or 1.
>>>
>>> 1. Task attempt #1 (with attemptId 0) starts, and calls
>>> ShuffleClient.pushData().
>>>
>>> 2. Task attempt #1 gets stuck at the call of mapperEnd() because
>>> ShuffleClient fails to send data to Celeborn for an unknown reason, while
>>> repeatedly producing INFO messages like:
>>>
>>> 2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO
>>> org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data
>>> success, new location for shuffle 1005007 map 408 attempt 0 partition 0
>>> batch 1 is location PartitionLocation[
>>>    id-epoch:0-4
>>>
>>> host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.103-39861-45968-46540-44091
>>>    mode:PRIMARY
>>>    peer:(empty)
>>>    storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK',
>>> finalResult=false, filePath=}
>>>    mapIdBitMap:null].
>>>
>>> 3. As task attempt #1 does not return for a long time, the speculative
>>> execution mechanism of MR3 kicks in and launches another task attempt
>>> #2 (with attemptId 1).
>>>
>>> 4. Task attempt #2 calls pushData() and succeeds. That is, task attempt
>>> #2
>>> successfully pushes data to Celeborn.
>>>
>>> 5. MR3 interrupts and kills task attempt #1. When this occurs,
>>> mapperEnd()
>>> gets interrupted and prints a message like the following:
>>>
>>> 2023-08-19 11:52:16,089 [DAG-1-5-1] WARN  RuntimeTask [] -
>>> LogicalOutput.close() fails on Reducer 12
>>> org.apache.celeborn.common.exception.CelebornIOException: sleep
>>> interrupted
>>>    at
>>>
>>> org.apache.celeborn.common.write.InFlightRequestTracker.limitZeroInFlight(InFlightRequestTracker.java:155)
>>>    at
>>>
>>> org.apache.celeborn.common.write.PushState.limitZeroInFlight(PushState.java:85)
>>>    at
>>>
>>> org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:611)
>>>    at
>>>
>>> org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1494)
>>>    at
>>>
>>> org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1478)
>>>
>>> 6. Now, a consumer task attempt tries to read the data pushed by task
>>> attempt #2. However, it fails to read the data sent by task attempt #2,
>>> with the following error:
>>>
>>> java.io.IOException: Premature EOF from inputStream
>>>    at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:212)
>>>    at
>>> org.apache.tez.runtime.library.common.shuffle.RssShuffleUtils.shuffleToMemory(RssShuffleUtils.java:47)
>>>
>>> Our implementation is quite standard:
>>>
>>>    inputStream = rssShuffleClient.readPartition(...);
>>>    org.apache.hadoop.io.IOUtils.readFully(inputStream, ..., dataLength);
>>>
>>> We double-checked the parameter dataLength and found that it was
>>> correctly
>>> set to the size of data pushed by task attempt #2.
>>>
>>> I have two questions:
>>>
>>> 1) In the context of using Celeborn, does our implementation violate the
>>> usage of Celeborn-API? For example, should we prohibit speculative
>>> execution because Celeborn requires only one task attempt to call
>>> pushData() at any point of time?
>>>
>>> 2) If speculative execution is not allowed, how can we quickly fail
>>> a task attempt stuck at mapperEnd()? By default, it seems like
>>> ShufflClient waits for 1200 seconds, not the defaul value of 120 seconds:
>>>
>>> 2023-08-19 11:51:32,159 [DAG-1-5-1] ERROR
>>> org.apache.celeborn.common.write.InFlightRequestTracker [] - After
>>> waiting
>>> for 1200000 ms, there are still 1 batches in flight for hostAndPushPort
>>> [192.168.10.106:38993], which exceeds the current limit 0.
>>>
>>> Thanks a lot,
>>>
>>> --- Sungwoo
>>>
>>>

Reply via email to