[
https://issues.apache.org/jira/browse/TEZ-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
László Bodor updated TEZ-4189:
------------------------------
Description:
Attached an example:
Mapper task attempt 0 fails while spilling spill3:
[^syslog_attempt_1589892856650_0073_1_07_000100_0]
{code}
2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0]
|writers.UnorderedPartitionedKVWriter|: Writing spill 3 to
/grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
Received should die response from AM
{code}
Mapper task attempt 1 passes, successfully spills out 7 spills:
[^syslog_attempt_1589892856650_0073_1_07_000100_1]
{code}
2020-05-25 13:09:47,722 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for
spill (final update=true), spillId=7
{code}
Reducer tasks get killed because they cannot recover from the mapper task
attempt failure, because fault tolerance wasn't implemented for pipelined
shuffle I guess: [^syslog_attempt_1589892856650_0073_1_08_000945_0]
{code}
2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] |impl.ShuffleManager|:
Killing self as previous attempt data could have been consumed
java.io.IOException: Previous event already got scheduled for
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
spillId=0]. Previous attempt's data could have been already merged to
memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
The interesting part is that the above failing reducer task attempt runs 4
minutes later than the successful, second mapper task attempt, so there is not
case of unfortunate scheduling I think, rather the pipelined shuffle is not
prepared for fetching in case of killed and reattempted inputs. For example, in
the reducer task, it tries to fetch input from
attempt_1589892856650_0073_1_07_000100_0
3_0, which is the map of the first, failing attempt, but this task start 4
minutes after there is already a successful attempt:
attempt_1589892856650_0073_1_07_000100_1.
{code}
2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|:
for
url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
sent hash and receievd reply 0 ms
{code}
Fetches an input spill from the first failing task attempt successfully:
{code}
2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] |ShuffleManager.fetch|:
Completed fetch for attempt: {100, 0,
attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, csize=18631,
d
{code}
And then the failure mentioned above because of the [fail-fast
check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
of different attempt numbers:
{code}
java.io.IOException: Previous event already got scheduled for
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
spillId=0]. Previous attempt's data could have been already merged to
memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
{code}
was:
Attached an example:
Mapper task attempt 0 fails while spilling spill3:
[^syslog_attempt_1589892856650_0073_1_07_000100_0]
{code}
2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0]
|writers.UnorderedPartitionedKVWriter|: Writing spill 3 to
/grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
Received should die response from AM
{code}
Mapper task attempt 1 passes, successfully spills out 7 spills:
[^syslog_attempt_1589892856650_0073_1_07_000100_1]
{code}
2020-05-25 13:09:47,722 [INFO] [TezChild]
|writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for
spill (final update=true), spillId=7
{code}
Reducer tasks get killed because they cannot recover from the mapper task
attempt failure, because fault tolerance wasn't implemented for pipelined
shuffle: [^syslog_attempt_1589892856650_0073_1_08_000945_0]
{code}
2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] |impl.ShuffleManager|:
Killing self as previous attempt data could have been consumed
java.io.IOException: Previous event already got scheduled for
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
spillId=0]. Previous attempt's data could have been already merged to
memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
The interesting part is that the above failing reducer task attempt runs 4
minutes later than the successful, second mapper task attempt, so there this is
not case of unfortunate scheduling I think, rather the pipelined shuffle is not
prepared for fetching in case of killed and reattempted inputs. For example, in
the reducer task, it tries to fetch input from
attempt_1589892856650_0073_1_07_000100_0
3_0, which is the map of the first, failing attempt, but this task start 4
minutes after there is already a successful attempt:
attempt_1589892856650_0073_1_07_000100_1.
{code}
2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|:
for
url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
sent hash and receievd reply 0 ms
{code}
Fetches an input spill from the first failing task attempt successfully:
{code}
2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] |ShuffleManager.fetch|:
Completed fetch for attempt: {100, 0,
attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, csize=18631,
d
{code}
And then the failure mentioned above because of the [fail-fast
check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
of different attempt numbers:
{code}
java.io.IOException: Previous event already got scheduled for
InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
spillId=0]. Previous attempt's data could have been already merged to
memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
{code}
> Support fault tolerance in pipelined data transfer for unordered output
> -----------------------------------------------------------------------
>
> Key: TEZ-4189
> URL: https://issues.apache.org/jira/browse/TEZ-4189
> Project: Apache Tez
> Issue Type: Bug
> Reporter: László Bodor
> Assignee: László Bodor
> Priority: Major
> Attachments: syslog_attempt_1589892856650_0073_1_07_000100_0,
> syslog_attempt_1589892856650_0073_1_07_000100_1,
> syslog_attempt_1589892856650_0073_1_08_000945_0
>
>
> Attached an example:
> Mapper task attempt 0 fails while spilling spill3:
> [^syslog_attempt_1589892856650_0073_1_07_000100_0]
> {code}
> 2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0]
> |writers.UnorderedPartitionedKVWriter|: Writing spill 3 to
> /grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
> 2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
> Received should die response from AM
> {code}
> Mapper task attempt 1 passes, successfully spills out 7 spills:
> [^syslog_attempt_1589892856650_0073_1_07_000100_1]
> {code}
> 2020-05-25 13:09:47,722 [INFO] [TezChild]
> |writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for
> spill (final update=true), spillId=7
> {code}
> Reducer tasks get killed because they cannot recover from the mapper task
> attempt failure, because fault tolerance wasn't implemented for pipelined
> shuffle I guess: [^syslog_attempt_1589892856650_0073_1_08_000945_0]
> {code}
> 2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}]
> |impl.ShuffleManager|: Killing self as previous attempt data could have been
> consumed
> java.io.IOException: Previous event already got scheduled for
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
> spillId=0]. Previous attempt's data could have been already merged to
> memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> at
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
> at
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
> at
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
> at
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
> at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> at
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
> at
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The interesting part is that the above failing reducer task attempt runs 4
> minutes later than the successful, second mapper task attempt, so there is
> not case of unfortunate scheduling I think, rather the pipelined shuffle is
> not prepared for fetching in case of killed and reattempted inputs. For
> example, in the reducer task, it tries to fetch input from
> attempt_1589892856650_0073_1_07_000100_0
> 3_0, which is the map of the first, failing attempt, but this task start 4
> minutes after there is already a successful attempt:
> attempt_1589892856650_0073_1_07_000100_1.
> {code}
> 2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|:
> for
> url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
> 5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
> 3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
> 0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
> attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
> tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
> mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true
> sent hash and receievd reply 0 ms
> {code}
> Fetches an input spill from the first failing task attempt successfully:
> {code}
> 2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3]
> |ShuffleManager.fetch|: Completed fetch for attempt: {100, 0,
> attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY,
> csize=18631, d
> {code}
> And then the failure mentioned above because of the [fail-fast
> check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495]
> of different attempt numbers:
> {code}
> java.io.IOException: Previous event already got scheduled for
> InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1,
> pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1,
> spillId=0]. Previous attempt's data could have been already merged to
> memory/disk outputs. Killing (self) this task early. currentAttemptNum=0,
> eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)