TEZ-3599. Unordered Fetcher can hang if empty partitions are present (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0cf1ce26 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0cf1ce26 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0cf1ce26 Branch: refs/heads/master Commit: 0cf1ce26c96139abd726e8e00cfcfacfafd2b4f8 Parents: bc3c17a Author: Jonathan Eagles <[email protected]> Authored: Thu Feb 2 15:56:43 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Thu Feb 2 15:56:43 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../runtime/library/common/shuffle/Fetcher.java | 114 +++++++++++-------- .../common/shuffle/impl/ShuffleManager.java | 28 +++-- .../library/common/shuffle/TestFetcher.java | 46 +++++--- .../common/shuffle/impl/TestShuffleManager.java | 2 +- .../shuffle/orderedgrouped/TestFetcher.java | 4 +- 6 files changed, 122 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 6c9a858..8416cc9 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3599. Unordered Fetcher can hang if empty partitions are present TEZ-3596. Number of Empty DME logged for Composite fetch is too high TEZ-3597. Composite Fetch hangs on certain DME empty events. TEZ-3595. Composite Fetch account error for disk direct http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 7b5ca17..a083daa 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -584,6 +584,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // yet_to_be_fetched list and marking the failed tasks. InputAttemptIdentifier[] failedInputs = null; while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) { + InputAttemptIdentifier inputAttemptIdentifier = + srcAttemptsRemaining.entrySet().iterator().next().getValue(); if (isShutDown.get()) { shutdownInternal(true); if (isDebugEnabled) { @@ -595,6 +597,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } try { failedInputs = fetchInputs(input, callback); + if(failedInputs == null || failedInputs.length == 0) { + srcAttemptsRemaining.remove(inputAttemptIdentifier.toString()); + } } catch (FetcherReadTimeoutException e) { //clean up connection shutdownInternal(true); @@ -635,6 +640,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator(); while (iterator.hasNext()) { + boolean hasFailures = false; if (isShutDown.get()) { if (isDebugEnabled) { LOG.debug( @@ -643,54 +649,67 @@ public class Fetcher extends CallableWithNdc<FetchResult> { break; } InputAttemptIdentifier srcAttemptId = iterator.next().getValue(); - long startTime = System.currentTimeMillis(); + for (int curPartition = 0; curPartition < partitionCount; curPartition++) { + int reduceId = curPartition + partition; + srcAttemptId = pathToAttemptMap.get(new PathPartition(srcAttemptId.getPathComponent(), reduceId)); + long startTime = System.currentTimeMillis(); - FetchedInput fetchedInput = null; - try { - TezIndexRecord idxRecord; - // for missing files, this will throw an exception - idxRecord = getTezIndexRecord(srcAttemptId); - - fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), - idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, - getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf, - new FetchedInputCallback() { - @Override - public void fetchComplete(FetchedInput fetchedInput) {} - - @Override - public void fetchFailed(FetchedInput fetchedInput) {} - - @Override - public void freeResources(FetchedInput fetchedInput) {} - }); - if (isDebugEnabled) { - LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId - + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() - + " to " + fetchedInput.getType()); - } - - long endTime = System.currentTimeMillis(); - fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), - idxRecord.getRawLength(), (endTime - startTime)); - iterator.remove(); - } catch (IOException e) { - cleanupFetchedInput(fetchedInput); - if (isShutDown.get()) { + FetchedInput fetchedInput = null; + try { + TezIndexRecord idxRecord; + // for missing files, this will throw an exception + idxRecord = getTezIndexRecord(srcAttemptId, reduceId); + + fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), + idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, + getShuffleInputFileName(srcAttemptId.getPathComponent(), null), + conf, + new FetchedInputCallback() { + @Override + public void fetchComplete(FetchedInput fetchedInput) { + } + + @Override + public void fetchFailed(FetchedInput fetchedInput) { + } + + @Override + public void freeResources(FetchedInput fetchedInput) { + } + }); if (isDebugEnabled) { - LOG.debug( - "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + - " from host " + - host + " : " + e.getClass().getName() + ", message=" + e.getMessage()); + LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + + " to " + fetchedInput.getType()); + } + + long endTime = System.currentTimeMillis(); + fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), + idxRecord.getRawLength(), (endTime - startTime)); + } catch (IOException e) { + hasFailures = true; + cleanupFetchedInput(fetchedInput); + if (isShutDown.get()) { + if (isDebugEnabled) { + LOG.debug( + "Already shutdown. Ignoring Local Fetch Failure for " + + srcAttemptId + + " from host " + + host + " : " + e.getClass().getName() + ", message=" + e.getMessage()); + } + break; + } + if (failMissing) { + LOG.warn( + "Failed to shuffle output of " + srcAttemptId + " from " + + host + "(local fetch)", + e); } - break; - } - if (failMissing) { - LOG.warn( - "Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)", - e); } } + if(!hasFailures) { + iterator.remove(); + } } InputAttemptIdentifier[] failedFetches = null; @@ -713,7 +732,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } @VisibleForTesting - protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws + protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId, int partition) throws IOException { TezIndexRecord idxRecord; Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(), @@ -745,6 +764,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf); } + @VisibleForTesting + public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() { + return pathToAttemptMap; + } + static class HostFetchResult { private final FetchResult fetchResult; private final InputAttemptIdentifier[] failedInputs; @@ -946,8 +970,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> { compressedLength, decompressedLength, (endTime - startTime)); // Note successful shuffle - srcAttemptsRemaining.remove(srcAttemptId.toString()); - // metrics.successFetch(); } } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 3964431..a23ce72 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -117,7 +117,7 @@ public class ShuffleManager implements FetcherCallback { private final BlockingQueue<FetchedInput> completedInputs; private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false); @VisibleForTesting - final Set<Integer> completedInputSet; + final BitSet completedInputSet; private final ConcurrentMap<HostPort, InputHost> knownSrcHosts; private final BlockingQueue<InputHost> pendingHosts; private final Set<InputAttemptIdentifier> obsoletedInputs; @@ -217,7 +217,7 @@ public class ShuffleManager implements FetcherCallback { this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); - completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs)); + completedInputSet = new BitSet(numInputs); /** * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. * We do not know upfront the number of spills from source. @@ -445,9 +445,17 @@ public class ShuffleManager implements FetcherCallback { } // Avoid adding attempts which have already completed. - if (completedInputSet.contains(input.getInputIdentifier())) { - inputIter.remove(); - continue; + if(input instanceof CompositeInputAttemptIdentifier) { + if (completedInputSet.nextClearBit(input.getInputIdentifier()) >= + input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) { + inputIter.remove(); + continue; + } + } else { + if (completedInputSet.get(input.getInputIdentifier())) { + inputIter.remove(); + continue; + } } // Avoid adding attempts which have been marked as OBSOLETE if (obsoletedInputs.contains(input)) { @@ -531,9 +539,9 @@ public class ShuffleManager implements FetcherCallback { LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); } - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { synchronized (completedInputSet) { - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier); if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { registerCompletedInput(fetchedInput); @@ -631,9 +639,9 @@ public class ShuffleManager implements FetcherCallback { inputContext.notifyProgress(); boolean committed = false; - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { synchronized (completedInputSet) { - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { fetchedInput.commit(); committed = true; ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, @@ -710,7 +718,7 @@ public class ShuffleManager implements FetcherCallback { private void adjustCompletedInputs(FetchedInput fetchedInput) { lock.lock(); try { - completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier()); + completedInputSet.set(fetchedInput.getInputAttemptIdentifier().getInputIdentifier()); int numComplete = numCompletedInputs.incrementAndGet(); if (numComplete == numInputs) { http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 17a065c..b031154 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.library.common.shuffle; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -31,6 +33,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -138,13 +141,13 @@ public class TestFetcher { @Test(timeout = 3000) public void testSetupLocalDiskFetch() throws Exception { - - InputAttemptIdentifier[] srcAttempts = { - new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"), - new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"), - new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"), - new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"), - new InputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4") + + CompositeInputAttemptIdentifier[] srcAttempts = { + new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1), + new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1), + new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1), + new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1), + new CompositeInputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1) }; final int FIRST_FAILED_ATTEMPT_IDX = 2; final int SECOND_FAILED_ATTEMPT_IDX = 4; @@ -156,10 +159,25 @@ public class TestFetcher { FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, - false, true, false); - builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts)); + false, true, true); + ArrayList<InputAttemptIdentifier> inputAttemptIdentifiers = new ArrayList<>(); + for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) { + for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) { + inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i)); + } + } + ArrayList<InputAttemptIdentifier> list = new ArrayList<InputAttemptIdentifier>(); + list.addAll(Arrays.asList(srcAttempts)); + builder.assignWork(HOST, PORT, partition, 1, list); Fetcher fetcher = spy(builder.build()); - + for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) { + for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) { + inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i)); + Fetcher.PathPartition pathPartition = + new Fetcher.PathPartition(compositeInputAttemptIdentifier.getPathComponent(),partition + i); + fetcher.getPathToAttemptMap().put(pathPartition, compositeInputAttemptIdentifier.expand(i)); + } + } doAnswer(new Answer<Path>() { @Override public Path answer(InvocationOnMock invocation) throws Throwable { @@ -183,7 +201,7 @@ public class TestFetcher { // match with params for copySucceeded below. return new TezIndexRecord(p * 10, p * 1000, p * 100); } - }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class)); + }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class), anyInt()); doNothing().when(fetcher).shutdown(); doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class), @@ -214,14 +232,14 @@ public class TestFetcher { srcAttempts[SECOND_FAILED_ATTEMPT_IDX]); } - protected void verifyFetchSucceeded(FetcherCallback callback, InputAttemptIdentifier srcAttempId, Configuration conf) throws IOException { + protected void verifyFetchSucceeded(FetcherCallback callback, CompositeInputAttemptIdentifier srcAttempId, Configuration conf) throws IOException { String pathComponent = srcAttempId.getPathComponent(); int len = pathComponent.length(); long p = Long.valueOf(pathComponent.substring(len - 1, len)); ArgumentCaptor<LocalDiskFetchedInput> capturedFetchedInput = ArgumentCaptor.forClass(LocalDiskFetchedInput.class); verify(callback) - .fetchSucceeded(eq(HOST), eq(srcAttempId), capturedFetchedInput.capture(), eq(p * 100), + .fetchSucceeded(eq(HOST), eq(srcAttempId.expand(0)), capturedFetchedInput.capture(), eq(p * 100), eq(p * 1000), anyLong()); LocalDiskFetchedInput f = capturedFetchedInput.getValue(); Assert.assertEquals("success callback filename", f.getInputFile().toString(), @@ -230,7 +248,7 @@ public class TestFetcher { Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10); Assert.assertEquals("success callback raw size", f.getActualSize(), p * 1000); Assert.assertEquals("success callback compressed size", f.getCompressedSize(), p * 100); - Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId); + Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId.expand(0)); Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT); } http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index f026cb2..b3b8688 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -220,7 +220,7 @@ public class TestShuffleManager { } public int getNumOfCompletedInputs() { - return completedInputSet.size(); + return completedInputSet.cardinality(); } boolean isFetcherExecutorShutdown() { http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 3686d17..54b0279 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -107,7 +107,7 @@ public class TestFetcher { static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class); - @Test (timeout = 5000) + @Test(timeout = 5000) public void testInputsReturnedOnConnectionException() throws Exception { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); @@ -333,7 +333,7 @@ public class TestFetcher { verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); } - @Test(timeout = 5000000) + @Test(timeout = 5000) public void testSetupLocalDiskFetchAutoReduce() throws Exception { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
