[ 
https://issues.apache.org/jira/browse/TEZ-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116865#comment-17116865
 ] 

Gopal Vijayaraghavan commented on TEZ-4189:
-------------------------------------------

bq.  because fault tolerance wasn't implemented for pipelined shuffle

A failure tolerance model was working in JDK7 and stopped working with JDK8, 
because we routinely use HashMap implementations in our operations to aggregate 
things in memory, then read them out into a shuffle output.

http://openjdk.java.net/jeps/180

prevents something like a group-by hash in memory from returning the rows in 
the same order.

So the 2nd attempt of the same task might not result in the same rows landing 
in spill 1, spill 2 etc.

Requiring us to restart the process, since we cannot restart a reducer from a 
fixed spil-id on a second attempt of a mapper & if we've already processed a 
spill for the reducer, we can't undo it.

> Support fault tolerance in pipelined data transfer for unordered output
> -----------------------------------------------------------------------
>
>                 Key: TEZ-4189
>                 URL: https://issues.apache.org/jira/browse/TEZ-4189
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: syslog_attempt_1589892856650_0073_1_07_000100_0, 
> syslog_attempt_1589892856650_0073_1_07_000100_1, 
> syslog_attempt_1589892856650_0073_1_08_000945_0
>
>
> Attached an example:
> Mapper task attempt 0 fails while spilling spill3:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_0] 
> {code}
> 2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0] 
> |writers.UnorderedPartitionedKVWriter|: Writing spill 3 to 
> /grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
> 2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
> Received should die response from AM
> {code}
> Mapper task attempt 1 passes, successfully spills out 7 spills:  
> [^syslog_attempt_1589892856650_0073_1_07_000100_1] 
> {code}
> 2020-05-25 13:09:47,722 [INFO] [TezChild] 
> |writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for 
> spill (final update=true), spillId=7
> {code}
> Reducer tasks get killed because they cannot recover from the mapper task 
> attempt failure, because fault tolerance wasn't implemented for pipelined 
> shuffle I guess:  [^syslog_attempt_1589892856650_0073_1_08_000945_0] 
> {code}
> 2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] 
> |impl.ShuffleManager|: Killing self as previous attempt data could have been 
> consumed
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
>         at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
>         at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at 
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>         at 
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The interesting part is that the above failing reducer task attempt runs 4 
> minutes later than the successful, second mapper task attempt, so there is 
> not case of unfortunate scheduling I think, rather the pipelined shuffle is 
> not prepared for fetching in case of killed and reattempted inputs. For 
> example, in the reducer task, it tries to fetch input from 
> attempt_1589892856650_0073_1_07_000100_0
> 3_0, which is the map of the first, failing attempt, but this task start 4 
> minutes after there is already a successful attempt: 
> attempt_1589892856650_0073_1_07_000100_1.
> {code}
> 2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: 
> for 
> url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
> 5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
> 3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
> 0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
> attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
> tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
> mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
>  sent hash and receievd reply 0 ms
> {code}
> Fetches an input spill from the first failing task attempt successfully:
> {code}
> 2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] 
> |ShuffleManager.fetch|: Completed fetch for attempt: {100, 0, 
> attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, 
> csize=18631, d
> {code}
> And then the failure mentioned above because of the [fail-fast 
> check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
>  of different attempt numbers:
> {code}
> java.io.IOException: Previous event already got scheduled for 
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, 
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, 
> spillId=0]. Previous attempt's data could have been already merged to 
> memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, 
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to