Hi Sungwoo, Is there any other Exceptions when 'Premature EOF from inputStream' occurs? Could you send the log file of the reduce task?
Thanks, Keyong Zhou Sungwoo Park <[email protected]> 于2023年8月21日周一 12:32写道: > Hi Keyong. > > Thanks for your reply. We call mapperEnd() in attempt #2 (which is > followed by a call to ShuffleClient.cleanup()). Also, attempt #1 is killed > after attempt #2 is finished. > > It looks like 'Premature EOF from inputStream' error occurs after a > taskattempt is interrupted while it keeps printing error messages like: > > 2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO > org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data > success, new location for shuffle 1005007 map 408 attempt 0 partition 0 > batch 1 is location PartitionLocation[ ... ]. > > Do you have any comments about this? We call Celeborn-API in a standard > way (using only pushData(), mapperEnd(), cleanup(), etc). > > Thanks, > > --- Sungwoo > > > On Mon, Aug 21, 2023 at 12:18 PM Keyong Zhou <[email protected]> wrote: > >> Hi Sungwoo, >> >> Thanks for your mail! For your questions: >> >> 1. No, your implementation does not violate the usage of Celeborn-API, >> and speculative execution >> is supported. Do you call mapperEnd in attempt #2? I think you can >> kill attempt #1 after the >> invocation of mapperEnd in attempt #2 succeeds. >> >> 2. Since speculation execution is allowed, we can safely kill a task >> attempt when another attempt >> succeeds. >> >> Thanks, >> Keyong Zhou >> >> <[email protected]> 于2023年8月19日周六 22:38写道: >> >>> Hello Celeborn team, >>> >>> We are quite close to completing our Celeborn-MR3 client, and I have a >>> question on speculative execution in the context of using Celeborn. >>> >>> MR3 supports speculative execution which allows several task attempts to >>> run concurrently. When a task attempt succeeds, all other concurrent task >>> attempts are interrupted and killed, so that only one task attempt >>> commits >>> its output. >>> >>> When using Celeborn-MR3, speculative execution sometimes seems to corrupt >>> data sent over to Celeborn. Below I describe a sequence of events that >>> produce this error. shuffleId, mapId, and partitionId are all fixed, >>> whereas attemptId can be either 0 or 1. >>> >>> 1. Task attempt #1 (with attemptId 0) starts, and calls >>> ShuffleClient.pushData(). >>> >>> 2. Task attempt #1 gets stuck at the call of mapperEnd() because >>> ShuffleClient fails to send data to Celeborn for an unknown reason, while >>> repeatedly producing INFO messages like: >>> >>> 2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO >>> org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data >>> success, new location for shuffle 1005007 map 408 attempt 0 partition 0 >>> batch 1 is location PartitionLocation[ >>> id-epoch:0-4 >>> >>> host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.103-39861-45968-46540-44091 >>> mode:PRIMARY >>> peer:(empty) >>> storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', >>> finalResult=false, filePath=} >>> mapIdBitMap:null]. >>> >>> 3. As task attempt #1 does not return for a long time, the speculative >>> execution mechanism of MR3 kicks in and launches another task attempt >>> #2 (with attemptId 1). >>> >>> 4. Task attempt #2 calls pushData() and succeeds. That is, task attempt >>> #2 >>> successfully pushes data to Celeborn. >>> >>> 5. MR3 interrupts and kills task attempt #1. When this occurs, >>> mapperEnd() >>> gets interrupted and prints a message like the following: >>> >>> 2023-08-19 11:52:16,089 [DAG-1-5-1] WARN RuntimeTask [] - >>> LogicalOutput.close() fails on Reducer 12 >>> org.apache.celeborn.common.exception.CelebornIOException: sleep >>> interrupted >>> at >>> >>> org.apache.celeborn.common.write.InFlightRequestTracker.limitZeroInFlight(InFlightRequestTracker.java:155) >>> at >>> >>> org.apache.celeborn.common.write.PushState.limitZeroInFlight(PushState.java:85) >>> at >>> >>> org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:611) >>> at >>> >>> org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1494) >>> at >>> >>> org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1478) >>> >>> 6. Now, a consumer task attempt tries to read the data pushed by task >>> attempt #2. However, it fails to read the data sent by task attempt #2, >>> with the following error: >>> >>> java.io.IOException: Premature EOF from inputStream >>> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:212) >>> at >>> org.apache.tez.runtime.library.common.shuffle.RssShuffleUtils.shuffleToMemory(RssShuffleUtils.java:47) >>> >>> Our implementation is quite standard: >>> >>> inputStream = rssShuffleClient.readPartition(...); >>> org.apache.hadoop.io.IOUtils.readFully(inputStream, ..., dataLength); >>> >>> We double-checked the parameter dataLength and found that it was >>> correctly >>> set to the size of data pushed by task attempt #2. >>> >>> I have two questions: >>> >>> 1) In the context of using Celeborn, does our implementation violate the >>> usage of Celeborn-API? For example, should we prohibit speculative >>> execution because Celeborn requires only one task attempt to call >>> pushData() at any point of time? >>> >>> 2) If speculative execution is not allowed, how can we quickly fail >>> a task attempt stuck at mapperEnd()? By default, it seems like >>> ShufflClient waits for 1200 seconds, not the defaul value of 120 seconds: >>> >>> 2023-08-19 11:51:32,159 [DAG-1-5-1] ERROR >>> org.apache.celeborn.common.write.InFlightRequestTracker [] - After >>> waiting >>> for 1200000 ms, there are still 1 batches in flight for hostAndPushPort >>> [192.168.10.106:38993], which exceeds the current limit 0. >>> >>> Thanks a lot, >>> >>> --- Sungwoo >>> >>>
