Repository: tez Updated Branches: refs/heads/master 2358521fa -> 614937c5d
TEZ-3804. FetcherOrderedGrouped#setupLocalDiskFetch should ignore empty partition records (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/614937c5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/614937c5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/614937c5 Branch: refs/heads/master Commit: 614937c5df88b79c85ae9fc6394652fb65d98081 Parents: 2358521 Author: Jonathan Eagles <jeag...@yahoo-inc.com> Authored: Tue Aug 1 16:58:42 2017 -0500 Committer: Jonathan Eagles <jeag...@yahoo-inc.com> Committed: Tue Aug 1 16:58:42 2017 -0500 ---------------------------------------------------------------------- .../orderedgrouped/FetcherOrderedGrouped.java | 3 + .../shuffle/orderedgrouped/TestFetcher.java | 83 +++++++++++++++++++- 2 files changed, 82 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 2c3aac3..68a54e9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -715,6 +715,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(), reduceId); Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), reduceId); + if(!indexRecord.hasData()) { + continue; + } mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); long endTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index ef371c2..6d30448 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -299,7 +299,7 @@ public class TestFetcher { throw new IOException("failing to simulate failure case"); } // match with params for copySucceeded below. - return new TezIndexRecord(p * 10, p * 1000, p * 100); + return new TezIndexRecord(p * 10, (p+1) * 1000, (p+2) * 100); } }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); } @@ -327,6 +327,81 @@ public class TestFetcher { verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); } + @Test (timeout = 5000) + public void testSetupLocalDiskFetchEmptyPartitions() throws Exception { + Configuration conf = new TezConfiguration(); + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + MergeManager merger = mock(MergeManager.class); + Shuffle shuffle = mock(Shuffle.class); + InputContext inputContext = mock(InputContext.class); + when(inputContext.getCounters()).thenReturn(new TezCounters()); + when(inputContext.getSourceVertexName()).thenReturn(""); + + MapHost host = new MapHost(HOST, PORT, 1, 1); + FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true, false); + FetcherOrderedGrouped spyFetcher = spy(fetcher); + + final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList( + new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1), + new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1), + new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1), + new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1), + new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1) + ); + + doReturn(srcAttempts).when(scheduler).getMapsForHost(host); + + final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>(); + for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) { + for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) { + ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i); + pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i)); + } + } + doAnswer(new Answer<InputAttemptIdentifier>() { + @Override + public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String path = (String) args[0]; + int reduceId = (int) args[1]; + return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId)); + } + }).when(scheduler) + .getIdentifierForFetchedOutput(any(String.class), any(int.class)); + + doAnswer(new Answer<Path>() { + @Override + public Path answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]); + } + }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString()); + + for (int i = 0; i < host.getPartitionCount(); i++) { + doAnswer(new Answer<TezIndexRecord>() { + @Override + public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String pathComponent = (String) args[0]; + int len = pathComponent.length(); + long p = Long.valueOf(pathComponent.substring(len - 1, len)); + // match with params for copySucceeded below. + return new TezIndexRecord(p * 10, 0, 0); + } + }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); + } + + doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); + spyFetcher.setupLocalDiskFetch(host); + verify(scheduler, times(0)).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); + verify(spyFetcher).putBackRemainingMapOutputs(host); + } + @Test(timeout = 5000) public void testSetupLocalDiskFetchAutoReduce() throws Exception { Configuration conf = new TezConfiguration(); @@ -412,7 +487,7 @@ public class TestFetcher { throw new IOException("Thowing exception to simulate failure case"); } // match with params for copySucceeded below. - return new TezIndexRecord(p * 10, p * 1000, p * 100); + return new TezIndexRecord(p * 10, (p + 1) * 1000, (p + 2) * 100); } }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); } @@ -455,8 +530,8 @@ public class TestFetcher { InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j); String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent(); ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class); - verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100), - eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean()); + verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq((p+2) * 100), + eq((p+1) * 1000), anyLong(), captureMapOutput.capture(), anyBoolean()); // cannot use the equals of MapOutput as it compares id which is private. so doing it manually MapOutput m = captureMapOutput.getAllValues().get(0);