Repository: tez Updated Branches: refs/heads/master 927781566 -> 79af4e8d0
TEZ-3976: Batch ShuffleManager error report events (Jaume Marhuenda, reviewed by Gopal V) Signed-off-by: Gopal V <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/79af4e8d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/79af4e8d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/79af4e8d Branch: refs/heads/master Commit: 79af4e8d06417829986dfc34b3627ead15d563ee Parents: 9277815 Author: Jaume Marhuenda <[email protected]> Authored: Tue Oct 23 15:30:13 2018 -0700 Committer: Gopal V <[email protected]> Committed: Tue Oct 23 15:30:13 2018 -0700 ---------------------------------------------------------------------- .../runtime/api/events/InputReadErrorEvent.java | 45 ++++++- .../library/api/TezRuntimeConfiguration.java | 10 ++ .../common/shuffle/impl/ShuffleManager.java | 129 ++++++++++++++++--- .../common/shuffle/impl/TestShuffleManager.java | 66 ++++++++++ 4 files changed, 232 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java index 7d2e0d2..cabc39f 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java @@ -21,6 +21,8 @@ package org.apache.tez.runtime.api.events; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.tez.runtime.api.Event; +import java.util.Objects; + /** * Event generated by an Input to indicate error when trying to retrieve data. * This is not necessarily a fatal event - it's an indication to the AM to retry @@ -44,17 +46,31 @@ public final class InputReadErrorEvent extends Event { */ private final int version; - private InputReadErrorEvent(String diagnostics, int index, - int version) { + /** + * Number of failures. + */ + private final int numFailures; + + private InputReadErrorEvent(final String diagnostics, final int index, + final int version, final int numFailures) { super(); this.diagnostics = diagnostics; this.index = index; this.version = version; + this.numFailures = numFailures; } public static InputReadErrorEvent create(String diagnostics, int index, int version) { - return new InputReadErrorEvent(diagnostics, index, version); + return create(diagnostics, index, version, 1); + } + + /** + * Create an InputReadErrorEvent. + */ + public static InputReadErrorEvent create(final String diagnostics, final int index, + final int version, final int numFailures) { + return new InputReadErrorEvent(diagnostics, index, version, numFailures); } public String getDiagnostics() { @@ -69,4 +85,27 @@ public final class InputReadErrorEvent extends Event { return version; } + /** + * @return number of failures + */ + public int getNumFailures() { + return numFailures; + } + + @Override + public int hashCode() { + return Objects.hash(index, version); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InputReadErrorEvent that = (InputReadErrorEvent) o; + return index == that.index && version == that.version; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 85c53a5..86792e2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -512,6 +512,15 @@ public class TezRuntimeConfiguration { TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output"; public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true; + /** + * Expert level setting. How long should @link{ShuffleManager} wait for batching + * before sending the events in milliseconds. Set to -1 to not wait. + */ + @ConfigurationProperty(type = "integer") + public static final String TEZ_RUNTIME_SHUFFLE_BATCH_WAIT = + TEZ_RUNTIME_PREFIX + "shuffle.batch.wait"; + public static final int TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT = -1; + /** * Share data fetched between tasks running on the same host if applicable @@ -619,6 +628,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 5f3693f..ba8592f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -26,6 +26,7 @@ import java.text.DecimalFormat; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -46,6 +48,8 @@ import java.util.concurrent.locks.ReentrantLock; import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.runtime.api.TaskFailureType; @@ -114,9 +118,30 @@ public class ShuffleManager implements FetcherCallback { @VisibleForTesting final ListeningExecutorService fetcherExecutor; + /** + * Executor for ReportCallable. + */ + private ExecutorService reporterExecutor; + + /** + * Lock to sync failedEvents. + */ + private final ReentrantLock reportLock = new ReentrantLock(); + + /** + * Condition to wake up the thread notifying when events fail. + */ + private final Condition reportCondition = reportLock.newCondition(); + + /** + * Events reporting fetcher failed. + */ + private final HashMap<InputReadErrorEvent, Integer> failedEvents + = new HashMap<>(); + private final ListeningExecutorService schedulerExecutor; private final RunShuffleCallable schedulerCallable; - + private final BlockingQueue<FetchedInput> completedInputs; private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false); @VisibleForTesting @@ -151,6 +176,11 @@ public class ShuffleManager implements FetcherCallback { private final int ifileBufferSize; private final boolean ifileReadAhead; private final int ifileReadAheadLength; + + /** + * Holds the time to wait for failures to batch them and send less events. + */ + private final int maxTimeToWaitForReportMillis; private final String srcNameTrimmed; @@ -199,7 +229,8 @@ public class ShuffleManager implements FetcherCallback { this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK); this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM); this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT); - + + this.ifileBufferSize = bufferSize; this.ifileReadAhead = ifileReadAheadEnabled; this.ifileReadAheadLength = ifileReadAheadLength; @@ -212,6 +243,10 @@ public class ShuffleManager implements FetcherCallback { this.verifyDiskChecksum = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); + this.maxTimeToWaitForReportMillis = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT); + this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -302,12 +337,63 @@ public class ShuffleManager implements FetcherCallback { public void run() throws IOException { Preconditions.checkState(inputManager != null, "InputManager must be configured"); + if (maxTimeToWaitForReportMillis > 0) { + reporterExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}") + .build()); + Future reporterFuture = reporterExecutor.submit(new ReporterCallable()); + } + ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable); Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback()); // Shutdown this executor once this task, and the callback complete. schedulerExecutor.shutdown(); } - + + private class ReporterCallable extends CallableWithNdc<Void> { + /** + * Measures if the batching interval has ended. + */ + private final Clock clock; + ReporterCallable() { + clock = new MonotonicClock(); + } + + @Override + protected Void callInternal() throws Exception { + long nextReport = 0; + while (!isShutdown.get()) { + try { + reportLock.lock(); + while (failedEvents.isEmpty()) { + boolean signaled = reportCondition.await(maxTimeToWaitForReportMillis, + TimeUnit.MILLISECONDS); + } + + long currentTime = clock.getTime(); + if (currentTime > nextReport) { + if (failedEvents.size() > 0) { + List<Event> failedEventsToSend = Lists.newArrayListWithCapacity( + failedEvents.size()); + for (InputReadErrorEvent key : failedEvents.keySet()) { + failedEventsToSend.add(InputReadErrorEvent + .create(key.getDiagnostics(), key.getIndex(), + key.getVersion(), failedEvents.get(key))); + } + inputContext.sendEvents(failedEventsToSend); + failedEvents.clear(); + nextReport = currentTime + maxTimeToWaitForReportMillis; + } + } + } finally { + reportLock.unlock(); + } + } + return null; + } + } + private class RunShuffleCallable extends CallableWithNdc<Void> { private final Configuration conf; @@ -804,18 +890,27 @@ public class ShuffleManager implements FetcherCallback { if (srcAttemptIdentifier == null) { reportFatalError(null, "Received fetchFailure for an unknown src (null)"); } else { - InputReadErrorEvent readError = InputReadErrorEvent.create( - "Fetch failure while fetching from " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), - srcAttemptIdentifier.getInputIdentifier(), - srcAttemptIdentifier.getAttemptNumber()), - srcAttemptIdentifier.getInputIdentifier(), - srcAttemptIdentifier.getAttemptNumber()); - - List<Event> failedEvents = Lists.newArrayListWithCapacity(1); - failedEvents.add(readError); - inputContext.sendEvents(failedEvents); + InputReadErrorEvent readError = InputReadErrorEvent.create( + "Fetch failure while fetching from " + + TezRuntimeUtils.getTaskAttemptIdentifier( + inputContext.getSourceVertexName(), + srcAttemptIdentifier.getInputIdentifier(), + srcAttemptIdentifier.getAttemptNumber()), + srcAttemptIdentifier.getInputIdentifier(), + srcAttemptIdentifier.getAttemptNumber()); + if (maxTimeToWaitForReportMillis > 0) { + try { + reportLock.lock(); + failedEvents.merge(readError, 1, (a, b) -> a + b); + reportCondition.signal(); + } finally { + reportLock.unlock(); + } + } else { + List<Event> events = Lists.newArrayListWithCapacity(1); + events.add(readError); + inputContext.sendEvents(events); + } } } /////////////////// End of Methods from FetcherCallbackHandler @@ -849,6 +944,10 @@ public class ShuffleManager implements FetcherCallback { if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) { this.schedulerExecutor.shutdownNow(); } + if (this.reporterExecutor != null + && !this.reporterExecutor.isShutdown()) { + this.reporterExecutor.shutdownNow(); + } if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) { this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers. } http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index 103f83d..94f7f5a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -20,7 +20,9 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -35,6 +37,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -57,6 +60,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; @@ -67,8 +71,10 @@ import org.apache.tez.runtime.library.common.shuffle.InputHost; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -214,6 +220,64 @@ public class TestShuffleManager { verify(inputContext, atLeast(3)).notifyProgress(); } + @Test (timeout = 200000) + public void testFetchFailed() throws Exception { + InputContext inputContext = createInputContext(); + final ShuffleManager shuffleManager = spy(createShuffleManager(inputContext, 1)); + Thread schedulerGetHostThread = new Thread(new Runnable() { + @Override + public void run() { + try { + shuffleManager.run(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + InputAttemptIdentifier inputAttemptIdentifier + = new InputAttemptIdentifier(1, 1); + + schedulerGetHostThread.start(); + Thread.sleep(1000); + shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false); + Thread.sleep(1000); + + ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class); + verify(inputContext, times(1)) + .sendEvents(captor.capture()); + Assert.assertEquals("Size was: " + captor.getAllValues().size(), + captor.getAllValues().size(), 1); + List<Event> capturedList = captor.getAllValues().get(0); + Assert.assertEquals("Size was: " + capturedList.size(), + capturedList.size(), 1); + InputReadErrorEvent inputEvent = (InputReadErrorEvent)capturedList.get(0); + Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(), + inputEvent.getNumFailures(), 1); + + shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false); + shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false); + + Thread.sleep(1000); + verify(inputContext, times(1)).sendEvents(any()); + + // Wait more than five seconds for the batch to go out + Thread.sleep(5000); + captor = ArgumentCaptor.forClass(List.class); + verify(inputContext, times(2)) + .sendEvents(captor.capture()); + Assert.assertEquals("Size was: " + captor.getAllValues().size(), + captor.getAllValues().size(), 2); + capturedList = captor.getAllValues().get(1); + Assert.assertEquals("Size was: " + capturedList.size(), + capturedList.size(), 1); + inputEvent = (InputReadErrorEvent)capturedList.get(0); + Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(), + inputEvent.getNumFailures(), 2); + + + schedulerGetHostThread.interrupt(); + } + private ShuffleManagerForTest createShuffleManager( InputContext inputContext, int expectedNumOfPhysicalInputs) throws IOException { @@ -222,6 +286,8 @@ public class TestShuffleManager { doReturn(outDirs).when(inputContext).getWorkDirs(); conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs()); + // 5 seconds + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT, 5000); DataOutputBuffer out = new DataOutputBuffer(); Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),
