[ 
https://issues.apache.org/jira/browse/TEZ-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated TEZ-4189:
------------------------------
    Description: 
Attached an example:
Mapper task attempt 0 fails while spilling spill3:  
[^syslog_attempt_1589892856650_0073_1_07_000100_0] 
{code}
2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0] 
|writers.UnorderedPartitionedKVWriter|: Writing spill 3 to 
/grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
Received should die response from AM
{code}

Mapper task attempt 1 passes, successfully spills out 7 spills:  
[^syslog_attempt_1589892856650_0073_1_07_000100_1] 
{code}
2020-05-25 13:09:47,722 [INFO] [TezChild] 
|writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for 
spill (final update=true), spillId=7
{code}

Reducer tasks get killed because they cannot recover from the mapper task 
attempt failure, because fault tolerance wasn't implemented for pipelined 
shuffle:  [^syslog_attempt_1589892856650_0073_1_08_000945_0] 
{code}
2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] |impl.ShuffleManager|: 
Killing self as previous attempt data could have been consumed
java.io.IOException: Previous event already got scheduled for 
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
spillId=0]. Previous attempt's data could have been already merged to 
memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

The interesting part is that the above failing reducer task attempt runs 4 
minutes later than the successful, second mapper task attempt, so there this is 
not case of unfortunate scheduling I think, rather the pipelined shuffle is not 
prepared for fetching in case of killed and reattempted inputs. For example, in 
the reducer task, it tries to fetch input from 
attempt_1589892856650_0073_1_07_000100_0
3_0, which is the map of the first, failing attempt, but this task start 4 
minutes after there is already a successful attempt: 
attempt_1589892856650_0073_1_07_000100_1.
{code}
2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: 
for 
url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
 sent hash and receievd reply 0 ms
{code}

Fetches an input spill from the first failing task attempt successfully:
{code}
2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] |ShuffleManager.fetch|: 
Completed fetch for attempt: {100, 0, 
attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, csize=18631, 
d
{code}

And then the failure mentioned above because of the [fail-fast 
check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
 of different attempt numbers:
{code}
java.io.IOException: Previous event already got scheduled for 
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
spillId=0]. Previous attempt's data could have been already merged to 
memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
{code}

> Support fault tolerance in pipelined data transfer for unordered output
> -----------------------------------------------------------------------
>
>                 Key: TEZ-4189
>                 URL: https://issues.apache.org/jira/browse/TEZ-4189
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: syslog_attempt_1589892856650_0073_1_07_000100_0, 
> syslog_attempt_1589892856650_0073_1_07_000100_1, 
> syslog_attempt_1589892856650_0073_1_08_000945_0
>
>
> Attached an example:
> Mapper task attempt 0 fails while spilling spill3:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_0] 
> {code}
> 2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0] 
> |writers.UnorderedPartitionedKVWriter|: Writing spill 3 to 
> /grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
> 2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
> Received should die response from AM
> {code}
> Mapper task attempt 1 passes, successfully spills out 7 spills:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_1] 
> {code}
> 2020-05-25 13:09:47,722 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for 
> spill (final update=true), spillId=7
> {code}
> Reducer tasks get killed because they cannot recover from the mapper task 
> attempt failure, because fault tolerance wasn't implemented for pipelined 
> shuffle:  [^syslog_attempt_1589892856650_0073_1_08_000945_0] 
> {code}
> 2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] 
> |impl.ShuffleManager|: Killing self as previous attempt data could have been 
> consumed
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
>         at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at 
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The interesting part is that the above failing reducer task attempt runs 4 
> minutes later than the successful, second mapper task attempt, so there this 
> is not case of unfortunate scheduling I think, rather the pipelined shuffle 
> is not prepared for fetching in case of killed and reattempted inputs. For 
> example, in the reducer task, it tries to fetch input from 
> attempt_1589892856650_0073_1_07_000100_0
> 3_0, which is the map of the first, failing attempt, but this task start 4 
> minutes after there is already a successful attempt: 
> attempt_1589892856650_0073_1_07_000100_1.
> {code}
> 2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: 
> for 
> url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
> 5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
> 3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
> 0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
> attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
> tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
> mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
>  sent hash and receievd reply 0 ms
> {code}
> Fetches an input spill from the first failing task attempt successfully:
> {code}
> 2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] 
> |ShuffleManager.fetch|: Completed fetch for attempt: {100, 0, 
> attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, 
> csize=18631, d
> {code}
> And then the failure mentioned above because of the [fail-fast 
> check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
>  of different attempt numbers:
> {code}
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to