TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c08eddf0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c08eddf0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c08eddf0 Branch: refs/heads/master Commit: c08eddf01d99e7e245d68fa8a4f6ab191308b4a7 Parents: de2e0f2 Author: Jonathan Eagles <[email protected]> Authored: Fri Feb 3 17:38:17 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Feb 3 17:38:17 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../tez/runtime/library/common/shuffle/Fetcher.java | 8 +++----- .../shuffle/orderedgrouped/FetcherOrderedGrouped.java | 12 +++++------- .../common/shuffle/orderedgrouped/TestFetcher.java | 2 +- 4 files changed, 10 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 1ba8ca6..025f53d 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case TEZ-3599. Unordered Fetcher can hang if empty partitions are present http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index a083daa..3afbb23 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -596,10 +596,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { false); } try { - failedInputs = fetchInputs(input, callback); - if(failedInputs == null || failedInputs.length == 0) { - srcAttemptsRemaining.remove(inputAttemptIdentifier.toString()); - } + failedInputs = fetchInputs(input, callback, inputAttemptIdentifier); } catch (FetcherReadTimeoutException e) { //clean up connection shutdownInternal(true); @@ -833,7 +830,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } } private InputAttemptIdentifier[] fetchInputs(DataInputStream input, - CachingCallBack callback) throws FetcherReadTimeoutException { + CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException { FetchedInput fetchedInput = null; InputAttemptIdentifier srcAttemptId = null; long decompressedLength = 0; @@ -972,6 +969,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Note successful shuffle // metrics.successFetch(); } + srcAttemptsRemaining.remove(inputAttemptIdentifier.toString()); } catch (IOException ioe) { if (isShutDown.get()) { cleanupFetchedInput(fetchedInput); http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 6bdb453..1bfd2a6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -278,16 +278,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // yet_to_be_fetched list and marking the failed tasks. InputAttemptIdentifier[] failedTasks = null; while (!remaining.isEmpty() && failedTasks == null) { - String inputAttemptIdentifierId = - remaining.entrySet().iterator().next().getKey(); + InputAttemptIdentifier inputAttemptIdentifier = + remaining.entrySet().iterator().next().getValue(); // fail immediately after first failure because we dont know how much to // skip for this error in the input stream. So we cannot move on to the // remaining outputs. YARN-1773. Will get to them in the next retry. try { - failedTasks = copyMapOutput(host, input); - if (failedTasks == null || failedTasks.length == 0) { - remaining.remove(inputAttemptIdentifierId); - } + failedTasks = copyMapOutput(host, input, inputAttemptIdentifier); } catch (FetcherReadTimeoutException e) { // Setup connection again if disconnected cleanupCurrentConnection(true); @@ -431,7 +428,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } protected InputAttemptIdentifier[] copyMapOutput(MapHost host, - DataInputStream input) throws FetcherReadTimeoutException { + DataInputStream input, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException { MapOutput mapOutput = null; InputAttemptIdentifier srcAttemptId = null; long decompressedLength = 0; @@ -575,6 +572,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // Note successful shuffle metrics.successFetch(); } + remaining.remove(inputAttemptIdentifier.toString()); } catch(IOException ioe) { if (stopped) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 54b0279..dfa473b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -561,7 +561,7 @@ public class TestFetcher { // Throw IOException when fetcher tries to connect again to the same node throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception"); } - }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class)); + }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class), any(InputAttemptIdentifier.class)); try { fetcher.copyFromHost(host);
