[ 
https://issues.apache.org/jira/browse/TEZ-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117498#comment-17117498
 ] 

László Bodor commented on TEZ-4189:
-----------------------------------

thanks [~gopalv] for clarifying this, this totally invalidates my original idea 
(to fetch and handle spills from different attempts and ignore which attempt 
they come from), but we can still collect spills for every seen map task 
attempt (maintain spill infos for multiple attempts in a ShuffleEventInfo) and 
considering that one complete, for which we had all the events processed 
first...is it viable or against something that I don't see at the moment?

considering this reducer log:  
[^syslog_attempt_1589892856650_0073_1_08_000945_0] 
I can see that it's about fetching the following inputs (there are 8 spills in 
this mapper task: 0-7):
attempt_1589892856650_0073_1_07_000100_1_10017_0
attempt_1589892856650_0073_1_07_000100_1_10017_1
attempt_1589892856650_0073_1_07_000100_1_10017_2
attempt_1589892856650_0073_1_07_000100_0_10003_2
attempt_1589892856650_0073_1_07_000100_1_10017_3
attempt_1589892856650_0073_1_07_000100_1_10017_4
attempt_1589892856650_0073_1_07_000100_1_10017_5
attempt_1589892856650_0073_1_07_000100_1_10017_6
attempt_1589892856650_0073_1_07_000100_1_10017_7

so it's aware of all the spills of the second, successful attempt

{code}
2020-05-25 13:13:14,641 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: 
for 
url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
5&map=attempt_1589892856650_0073_1_07_000092_0_10002_1,attempt_1589892856650_0073_1_07_000036_0_10003_1,attempt_1589892856650_0073_1_07_000068_0_10003_1,attempt_1589892856650_0073_1_07_000004_0_1000
3_1,attempt_1589892856650_0073_1_07_000012_0_10002_1,attempt_1589892856650_0073_1_07_000044_0_10003_1,attempt_1589892856650_0073_1_07_000060_0_10002_1,attempt_1589892856650_0073_1_07_000084_0_10003_
2,attempt_1589892856650_0073_1_07_000020_0_10003_1,attempt_1589892856650_0073_1_07_000028_0_10003_2,attempt_1589892856650_0073_1_07_000052_0_10002_2,attempt_1589892856650_0073_1_07_000036_0_10003_2,
attempt_1589892856650_0073_1_07_000100_0_10003_2,attempt_1589892856650_0073_1_07_000004_0_10003_2,attempt_1589892856650_0073_1_07_000092_0_10002_2,attempt_1589892856650_0073_1_07_000076_0_10003_2,at
tempt_1589892856650_0073_1_07_000068_0_10003_2,attempt_1589892856650_0073_1_07_000084_0_10003_3,attempt_1589892856650_0073_1_07_000012_0_10002_2,attempt_1589892856650_0073_1_07_000028_0_10003_3,atte
mpt_1589892856650_0073_1_07_000108_1_10024_0,attempt_1589892856650_0073_1_07_000100_1_10017_0,attempt_1589892856650_0073_1_07_000092_1_10027_0,attempt_1589892856650_0073_1_07_000100_1_10017_1,attemp
t_1589892856650_0073_1_07_000084_1_10026_0,attempt_1589892856650_0073_1_07_000108_1_10024_1,attempt_1589892856650_0073_1_07_000076_1_10020_0,attempt_1589892856650_0073_1_07_000068_1_10021_0,attempt_
1589892856650_0073_1_07_000060_1_10029_0,attempt_1589892856650_0073_1_07_000092_1_10027_1,attempt_1589892856650_0073_1_07_000084_1_10026_1,attempt_1589892856650_0073_1_07_000100_1_10017_2,attempt_15
89892856650_0073_1_07_000108_1_10024_2,attempt_1589892856650_0073_1_07_000076_1_10020_1,attempt_1589892856650_0073_1_07_000060_1_10029_1,attempt_1589892856650_0073_1_07_000068_1_10021_1,attempt_1589
892856650_0073_1_07_000092_1_10027_2,attempt_1589892856650_0073_1_07_000084_1_10026_2,attempt_1589892856650_0073_1_07_000100_1_10017_3,attempt_1589892856650_0073_1_07_000108_1_10024_3,attempt_158989
2856650_0073_1_07_000076_1_10020_2,attempt_1589892856650_0073_1_07_000068_1_10021_2,attempt_1589892856650_0073_1_07_000060_1_10029_2,attempt_1589892856650_0073_1_07_000092_1_10027_3,attempt_15898928
56650_0073_1_07_000100_1_10017_4,attempt_1589892856650_0073_1_07_000084_1_10026_3,attempt_1589892856650_0073_1_07_000076_1_10020_3,attempt_1589892856650_0073_1_07_000108_1_10024_4,attempt_1589892856
650_0073_1_07_000060_1_10029_3,attempt_1589892856650_0073_1_07_000068_1_10021_3,attempt_1589892856650_0073_1_07_000092_1_10027_4,attempt_1589892856650_0073_1_07_000100_1_10017_5,attempt_158989285665
0_0073_1_07_000084_1_10026_4,attempt_1589892856650_0073_1_07_000108_1_10024_5,attempt_1589892856650_0073_1_07_000076_1_10020_4,attempt_1589892856650_0073_1_07_000092_1_10027_5,attempt_1589892856650_
0073_1_07_000068_1_10021_4,attempt_1589892856650_0073_1_07_000060_1_10029_4,attempt_1589892856650_0073_1_07_000084_1_10026_5,attempt_1589892856650_0073_1_07_000100_1_10017_6,attempt_1589892856650_00
73_1_07_000108_1_10024_6,attempt_1589892856650_0073_1_07_000076_1_10020_5,attempt_1589892856650_0073_1_07_000100_1_10017_7,attempt_1589892856650_0073_1_07_000092_1_10027_6,attempt_1589892856650_0073
_1_07_000060_1_10029_5,attempt_1589892856650_0073_1_07_000068_1_10021_5,attempt_1589892856650_0073_1_07_000108_1_10024_7,attempt_1589892856650_0073_1_07_000092_1_10027_7&keepAlive=true
 sent hash and
 receievd reply 0 ms
{code}

but later it fails because of altering attempt numbers, however, it could move 
on with all the spills from the second attempt, that's what made me think of 
collecting all the spills regardless of attempt number
{code}
2020-05-25 13:13:14,670 [ERROR] [Fetcher_B {Map_16} #3] |impl.ShuffleManager|: 
Killing self as previous attempt data could have been consumed
java.io.IOException: Previous event already got scheduled for 
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
spillId=0]. Previous attempt's data could have been already merged to 
memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
eventsProcessed={0, 1, 2}, scheduledForDownload=true, newAttemptNum=1
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.registerCompletedInputForPipelinedShuffle(ShuffleManager.java:877)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.fetchSucceeded(ShuffleManager.java:812)
        at 
org.apache.tez.runtime.library.common.shuffle.Fetcher.fetchInputs(Fetcher.java:978)
        at 
org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:601)
        at 
org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:487)
        at 
org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:285)
        at 
org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:76)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

> Support fault tolerance in pipelined data transfer for unordered output
> -----------------------------------------------------------------------
>
>                 Key: TEZ-4189
>                 URL: https://issues.apache.org/jira/browse/TEZ-4189
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: syslog_attempt_1589892856650_0073_1_07_000100_0, 
> syslog_attempt_1589892856650_0073_1_07_000100_1, 
> syslog_attempt_1589892856650_0073_1_08_000945_0
>
>
> Attached an example:
> Mapper task attempt 0 fails while spilling spill3:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_0] 
> {code}
> 2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0] 
> |writers.UnorderedPartitionedKVWriter|: Writing spill 3 to 
> /grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
> 2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
> Received should die response from AM
> {code}
> Mapper task attempt 1 passes, successfully spills out 7 spills:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_1] 
> {code}
> 2020-05-25 13:09:47,722 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for 
> spill (final update=true), spillId=7
> {code}
> Reducer tasks get killed because they cannot recover from the mapper task 
> attempt failure, because fault tolerance wasn't implemented for pipelined 
> shuffle I guess:  [^syslog_attempt_1589892856650_0073_1_08_000945_0] 
> {code}
> 2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] 
> |impl.ShuffleManager|: Killing self as previous attempt data could have been 
> consumed
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
>         at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at 
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The interesting part is that the above failing reducer task attempt runs 4 
> minutes later than the successful, second mapper task attempt, so there is 
> not case of unfortunate scheduling I think, rather the pipelined shuffle is 
> not prepared for fetching in case of killed and reattempted inputs. For 
> example, in the reducer task, it tries to fetch input from 
> attempt_1589892856650_0073_1_07_000100_0
> 3_0, which is the map of the first, failing attempt, but this task start 4 
> minutes after there is already a successful attempt: 
> attempt_1589892856650_0073_1_07_000100_1.
> {code}
> 2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: 
> for 
> url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
> 5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
> 3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
> 0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
> attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
> tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
> mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
>  sent hash and receievd reply 0 ms
> {code}
> Fetches an input spill from the first failing task attempt successfully:
> {code}
> 2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] 
> |ShuffleManager.fetch|: Completed fetch for attempt: {100, 0, 
> attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, 
> csize=18631, d
> {code}
> And then the failure mentioned above because of the [fail-fast 
> check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
>  of different attempt numbers:
> {code}
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to