Repository: tez Updated Branches: refs/heads/master 4d381d778 -> 73da831e8
TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation (Saikat via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73da831e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73da831e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73da831e Branch: refs/heads/master Commit: 73da831e8e5acab5e2e044c74a29384bcda4cbc6 Parents: 4d381d7 Author: Rajesh Balamohan <[email protected]> Authored: Tue Aug 4 05:59:12 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Aug 4 05:59:12 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../orderedgrouped/FetcherOrderedGrouped.java | 52 +++++++------- .../shuffle/orderedgrouped/TestFetcher.java | 74 ++++++++++++++++++-- 3 files changed, 96 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 303171a..59307b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2646. Add scheduling casual dependency for attempts ALL CHANGES: + TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2645. Provide standard analyzers for job analysis. TEZ-2627. Support for Tez Job Priorities. http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index a4d38ce..d8be8dd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.net.URL; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.tez.http.BaseHttpConnection; @@ -87,8 +89,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final boolean ifileReadAhead; private final int ifileReadAheadLength; @VisibleForTesting - LinkedList<InputAttemptIdentifier> remaining; - + Map<String, InputAttemptIdentifier> remaining; volatile DataInputStream input; volatile BaseHttpConnection httpConnection; @@ -228,25 +229,19 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { retryStartTime = 0; // Get completed maps on 'host' List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host); - // Sanity check to catch hosts with only 'OBSOLETE' maps, // especially at the tail of large jobs if (srcAttempts.size() == 0) { return; } - if(LOG.isDebugEnabled()) { LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + srcAttempts + ", partitionId: " + currentPartition); } - - // List of maps to be fetched yet - remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts); - + populateRemainingMap(srcAttempts); // Construct the url and connect - try { - if (!setupConnection(host, srcAttempts)) { + if (!setupConnection(host, remaining.values())) { if (stopped) { cleanupCurrentConnection(true); } @@ -273,7 +268,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { return; } // Connect with retry - if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) { + if (!setupConnection(host, remaining.values())) { if (stopped) { cleanupCurrentConnection(true); LOG.info("Not reporting connection re-establishment failure since fetcher is stopped"); @@ -310,7 +305,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } @VisibleForTesting - boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts) + boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempts) throws IOException { boolean connectSucceeded = false; try { @@ -347,7 +342,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // At this point, either the connection failed, or the initial header verification failed. // The error does not relate to any specific Input. Report all of them as failed. // This ends up indirectly penalizing the host (multiple failures reported on the single host) - for(InputAttemptIdentifier left: remaining) { + for (InputAttemptIdentifier left : remaining.values()) { // Need to be handling temporary glitches .. // Report read error to the AM to trigger source failure heuristics scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded); @@ -361,7 +356,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // Cycle through remaining MapOutputs boolean isFirst = true; InputAttemptIdentifier first = null; - for (InputAttemptIdentifier left : remaining) { + for (InputAttemptIdentifier left : remaining.values()) { if (isFirst) { first = left; isFirst = false; @@ -487,7 +482,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, endTime - startTime, mapOutput); // Note successful shuffle - remaining.remove(srcAttemptId); + remaining.remove(srcAttemptId.toString()); metrics.successFetch(); return null; } catch (IOException ioe) { @@ -514,12 +509,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); if(srcAttemptId == null) { - return remaining.toArray(new InputAttemptIdentifier[remaining.size()]); + return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]); } else { return new InputAttemptIdentifier[] {srcAttemptId}; } } - LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host.getHostIdentifier(), ioe); @@ -528,7 +522,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { metrics.failedFetch(); return new InputAttemptIdentifier[] {srcAttemptId}; } - } /** @@ -572,7 +565,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { * @return true/false, based on if the verification succeeded or not */ private boolean verifySanity(long compressedLength, long decompressedLength, - int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) { + int forReduce, Map<String, InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) { if (compressedLength < 0 || decompressedLength < 0) { wrongLengthErrs.increment(1); LOG.warn(logIdentifier + " invalid lengths in map output header: id: " + @@ -592,7 +585,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } // Sanity check - if (!remaining.contains(srcAttemptId)) { + if (remaining.get(srcAttemptId.toString()) == null) { wrongMapErrs.increment(1); LOG.warn("Invalid map-output! Received output for " + srcAttemptId); return false; @@ -603,7 +596,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private InputAttemptIdentifier getNextRemainingAttempt() { if (remaining.size() > 0) { - return remaining.iterator().next(); + return remaining.values().iterator().next(); } else { return null; } @@ -626,10 +619,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } // List of maps to be fetched yet - remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts); + populateRemainingMap(srcAttempts); try { - final Iterator<InputAttemptIdentifier> iter = remaining.iterator(); + final Iterator<InputAttemptIdentifier> iter = remaining.values().iterator(); while (iter.hasNext()) { // Avoid fetching more if already stopped if (stopped) { @@ -701,5 +694,16 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { return MapOutput.createLocalDiskMapOutput(srcAttemptId, allocator, filename, indexRecord.getStartOffset(), indexRecord.getPartLength(), true); } + + @VisibleForTesting + void populateRemainingMap(List<InputAttemptIdentifier> origlist) { + if (remaining == null) { + remaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size()); + } + for (InputAttemptIdentifier id : origlist) { + remaining.put(id.toString(), id); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 385b7b0..7415570 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -22,7 +22,6 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -42,9 +41,15 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map.Entry; import com.google.common.collect.Lists; + import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.counters.TezCounter; @@ -373,7 +378,7 @@ public class TestFetcher { new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3") ); doReturn(srcAttempts).when(scheduler).getMapsForHost(host); - doReturn(true).when(fetcher).setupConnection(host, srcAttempts); + doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class)); URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false); fetcher.httpConnection = new FakeHttpConnection(url, null, "", null); @@ -394,8 +399,7 @@ public class TestFetcher { @Override public Void answer(InvocationOnMock invocation) throws Throwable { // Emulate host down for 4 seconds. Thread.sleep(4000); - doReturn(false).when(fetcher).setupConnection(host, srcAttempts); - + doReturn(false).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class)); // Throw IOException when fetcher tries to connect again to the same node throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception"); } @@ -407,7 +411,7 @@ public class TestFetcher { //ignore } //setup connection should be called twice (1 for connect and another for retry) - verify(fetcher, times(2)).setupConnection(any(MapHost.class), anyList()); + verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class)); //since copyMapOutput consistently fails, it should call copyFailed once verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class), anyBoolean(), anyBoolean()); @@ -458,8 +462,7 @@ public class TestFetcher { wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true); final FetcherOrderedGrouped fetcher = spy(mockFetcher); - fetcher.remaining = Lists.newLinkedList(); - + fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>(); final List<InputAttemptIdentifier> srcAttempts = Arrays.asList( new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"), new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"), @@ -477,4 +480,61 @@ public class TestFetcher { fail(); } } + + @Test(timeout = 1000) + public void testInputAttemptIdentifierMap() { + InputAttemptIdentifier[] srcAttempts = { + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + //duplicate entry + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + // pipeline shuffle based identifiers, with multiple attempts + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), + new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) + }; + InputAttemptIdentifier[] expectedSrcAttempts = { + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + // pipeline shuffle based identifiers + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), + new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) + }; + + Configuration conf = new TezConfiguration(); + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + Shuffle shuffle = mock(Shuffle.class); + MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + FetcherOrderedGrouped fetcher = + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + wrongLengthErrsCounter, badIdErrsCounter, + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); + Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size()); + Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator(); + int count = 0; + while(iterator.hasNext()) { + String key = iterator.next().getKey(); + Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0); + } + } }
