Repository: hadoop Updated Branches: refs/heads/branch-2.7 91ca90fdd -> 2ec5bb0fe
MAPREDUCE-6633. AM should retry map attempts if the reduce task encounters commpression related errors. Contributed by Rushabh Shah (cherry picked from commit 1fec06e037d2b22dafc64f33d4f1231bef4ceba8) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ec5bb0f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ec5bb0f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ec5bb0f Branch: refs/heads/branch-2.7 Commit: 2ec5bb0fe88d5e91bdeef5b2b4f19bf0e866f939 Parents: 91ca90f Author: Eric Payne <[email protected]> Authored: Sat Apr 9 16:51:57 2016 +0000 Committer: Eric Payne <[email protected]> Committed: Mon Apr 18 16:12:53 2016 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 2 +- .../mapreduce/task/reduce/TestFetcher.java | 37 ++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec5bb0f/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 31ec4a1..69e5d5c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -20,6 +20,9 @@ Release 2.7.3 - UNRELEASED BUG FIXES + MAPREDUCE-6633. AM should retry map attempts if the reduce task encounters + commpression related errors (Rushabh Shah via epayne) + MAPREDUCE-4785. TestMRApp occasionally fails (haibochen via rkanter) MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee) http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec5bb0f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 4b80dc9..2e255f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -536,7 +536,7 @@ class Fetcher<K,V> extends Thread { + " len: " + compressedLength + " to " + mapOutput.getDescription()); mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); - } catch (java.lang.InternalError e) { + } catch (java.lang.InternalError | Exception e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec5bb0f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index a9cd33e..f31e160 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -348,6 +348,43 @@ public class TestFetcher { @SuppressWarnings("unchecked") @Test(timeout=10000) + public void testCopyFromHostOnAnyException() throws Exception { + InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); + + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, + r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField( + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + + doThrow(new ArrayIndexOutOfBoundsException()).when(immo) + .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + verify(ss, times(1)).copyFailed(map1ID, host, true, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout=10000) public void testCopyFromHostWithRetry() throws Exception { InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); ss = mock(ShuffleSchedulerImpl.class);
