TEZ-3595. Composite Fetch account error for disk direct (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b20be06 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b20be06 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b20be06 Branch: refs/heads/master Commit: 3b20be06fe89cc65e56a4520336a01afa100d811 Parents: 4b30f8e Author: Jonathan Eagles <[email protected]> Authored: Mon Jan 30 15:20:45 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Jan 30 15:20:45 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../orderedgrouped/FetcherOrderedGrouped.java | 66 ++++--- .../shuffle/orderedgrouped/TestFetcher.java | 198 ++++++++++++++++--- 3 files changed, 213 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 4703d95..e40759f 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3595. Composite Fetch account error for disk direct TEZ-3590. Remove google.protobuf from the tez-auxservices shaded jar TEZ-3587. Fetcher fetchInputs() can NPE on srcAttempt due to missing entry in pathToAttemptMap TEZ-3586. Remove fusesource.leveldbjni from the tez-auxservices shaded jar http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 18b824a..f213268 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -711,37 +711,47 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } InputAttemptIdentifier srcAttemptId = iter.next(); MapOutput mapOutput = null; - try { - long startTime = System.currentTimeMillis(); - Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); - - TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), - minPartition); - - mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); - long endTime = System.currentTimeMillis(); - scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), - indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); - iter.remove(); - metrics.successFetch(); - } catch (IOException e) { - if (mapOutput != null) { - mapOutput.abort(); - } - if (!stopped) { - metrics.failedFetch(); - ioErrs.increment(1); - scheduler.copyFailed(srcAttemptId, host, true, false, true); - LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + - host.getHostIdentifier(), e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Ignoring fetch error during local disk copy since fetcher has already been stopped"); + boolean hasFailures = false; + // Fetch partition count number of map outputs (handles auto-reduce case) + for (int curPartition = minPartition; curPartition <= maxPartition; curPartition++) { + try { + long startTime = System.currentTimeMillis(); + + // Partition id is the base partition id plus the relative offset + int reduceId = host.getPartitionId() + curPartition - minPartition; + srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(), reduceId); + Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); + TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), reduceId); + + mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); + long endTime = System.currentTimeMillis(); + scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), + indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); + metrics.successFetch(); + } catch (IOException e) { + if (mapOutput != null) { + mapOutput.abort(); } - return; + if (!stopped) { + hasFailures = true; + metrics.failedFetch(); + ioErrs.increment(1); + scheduler.copyFailed(srcAttemptId, host, true, false, true); + LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + + host.getHostIdentifier(), e); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Ignoring fetch error during local disk copy since fetcher has already been stopped"); + } + return; + } + } } + if (!hasFailures) { + iter.remove(); + } } } finally { putBackRemainingMapOutputs(host); http://git-wip-us.apache.org/repos/asf/tez/blob/3b20be06/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index a6e4c21..3686d17 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -47,12 +47,15 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import com.google.common.collect.Lists; import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -235,12 +238,12 @@ public class TestFetcher { FetcherOrderedGrouped spyFetcher = spy(fetcher); - List<InputAttemptIdentifier> srcAttempts = Arrays.asList( - new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"), - new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"), - new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"), - new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"), - new InputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4") + final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList( + new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1), + new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1), + new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1), + new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1), + new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1) ); final int FIRST_FAILED_ATTEMPT_IDX = 2; final int SECOND_FAILED_ATTEMPT_IDX = 4; @@ -248,6 +251,24 @@ public class TestFetcher { doReturn(srcAttempts).when(scheduler).getMapsForHost(host); + final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>(); + for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) { + for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) { + ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i); + pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i)); + } + } + doAnswer(new Answer<InputAttemptIdentifier>() { + @Override + public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String path = (String) args[0]; + int reduceId = (int) args[1]; + return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId)); + } + }).when(scheduler) + .getIdentifierForFetchedOutput(any(String.class), any(int.class)); + doAnswer(new Answer<MapOutput>() { @Override public MapOutput answer(InvocationOnMock invocation) throws Throwable { @@ -269,20 +290,22 @@ public class TestFetcher { } }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString()); - doAnswer(new Answer<TezIndexRecord>() { - @Override - public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - String pathComponent = (String) args[0]; - int len = pathComponent.length(); - long p = Long.valueOf(pathComponent.substring(len - 1, len)); - if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) { - throw new IOException("failing to simulate failure case"); + for (int i = 0; i < host.getPartitionCount(); i++) { + doAnswer(new Answer<TezIndexRecord>() { + @Override + public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String pathComponent = (String) args[0]; + int len = pathComponent.length(); + long p = Long.valueOf(pathComponent.substring(len - 1, len)); + if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) { + throw new IOException("failing to simulate failure case"); + } + // match with params for copySucceeded below. + return new TezIndexRecord(p * 10, p * 1000, p * 100); } - // match with params for copySucceeded below. - return new TezIndexRecord(p * 10, p * 1000, p * 100); - } - }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId())); + }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); + } doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); @@ -295,10 +318,12 @@ public class TestFetcher { // should have exactly 3 success and 1 failure. for (int i : sucessfulAttemptsIndexes) { - verifyCopySucceeded(scheduler, host, srcAttempts, i); + for (int j = 0; j < host.getPartitionCount(); j++) { + verifyCopySucceeded(scheduler, host, srcAttempts, i, j); + } } - verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); - verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); verify(metrics, times(3)).successFetch(); verify(metrics, times(2)).failedFetch(); @@ -308,11 +333,136 @@ public class TestFetcher { verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); } + @Test(timeout = 5000000) + public void testSetupLocalDiskFetchAutoReduce() throws Exception { + Configuration conf = new TezConfiguration(); + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + Shuffle shuffle = mock(Shuffle.class); + InputContext inputContext = mock(InputContext.class); + when(inputContext.getCounters()).thenReturn(new TezCounters()); + when(inputContext.getSourceVertexName()).thenReturn(""); + + MapHost host = new MapHost(HOST, PORT, 1, 2); + FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true, false); + FetcherOrderedGrouped spyFetcher = spy(fetcher); + + + final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList( + new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", host.getPartitionCount()) + ); + final int FIRST_FAILED_ATTEMPT_IDX = 2; + final int SECOND_FAILED_ATTEMPT_IDX = 4; + final int[] sucessfulAttemptsIndexes = { 0, 1, 3 }; + + doReturn(srcAttempts).when(scheduler).getMapsForHost(host); + final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>(); + for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) { + for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) { + ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i); + pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i)); + } + } + + doAnswer(new Answer<InputAttemptIdentifier>() { + @Override + public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String path = (String) args[0]; + int reduceId = (int) args[1]; + return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId)); + } + }).when(scheduler) + .getIdentifierForFetchedOutput(any(String.class), any(int.class)); + + doAnswer(new Answer<MapOutput>() { + @Override + public MapOutput answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + MapOutput mapOutput = mock(MapOutput.class); + doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType(); + doReturn(args[0]).when(mapOutput).getAttemptIdentifier(); + return mapOutput; + } + }).when(spyFetcher) + .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class), + any(TezIndexRecord.class)); + + doAnswer(new Answer<Path>() { + @Override + public Path answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]); + } + }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString()); + + for (int i = 0; i < host.getPartitionCount(); i++) { + doAnswer(new Answer<TezIndexRecord>() { + @Override + public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String pathComponent = (String) args[0]; + int len = pathComponent.length(); + long p = Long.valueOf(pathComponent.substring(len - 1, len)); + + if (pathComponent.equals(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).getPathComponent()) || + pathComponent.equals(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).getPathComponent())) { + throw new IOException("Thowing exception to simulate failure case"); + } + // match with params for copySucceeded below. + return new TezIndexRecord(p * 10, p * 1000, p * 100); + } + }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); + } + + doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1)); + + spyFetcher.setupLocalDiskFetch(host); + + // should have exactly 3 success and 1 failure. + for (int i : sucessfulAttemptsIndexes) { + for (int j = 0; j < host.getPartitionCount(); j++) { + verifyCopySucceeded(scheduler, host, srcAttempts, i, j); + } + } + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true); + + verify(metrics, times(6)).successFetch(); + verify(metrics, times(4)).failedFetch(); + + verify(spyFetcher).putBackRemainingMapOutputs(host); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); + } + private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host, - List<InputAttemptIdentifier> srcAttempts, long p) throws + List<CompositeInputAttemptIdentifier> srcAttempts, long p, int j) throws IOException { // need to verify filename, offsets, sizes wherever they are used. - InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p); + InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j); String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent(); ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class); verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
