Repository: tez Updated Branches: refs/heads/master d9f542f4c -> 93bd26ebb
TEZ-3700 addendum. Consumer attempt should kill itself instead of failing during validation checks with final merge avoidance (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/93bd26eb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/93bd26eb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/93bd26eb Branch: refs/heads/master Commit: 93bd26ebbd159d62ebd33c37affbd9b0837679f1 Parents: d9f542f Author: Siddharth Seth <[email protected]> Authored: Mon May 8 11:35:24 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon May 8 11:35:24 2017 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 9 +++--- .../common/shuffle/impl/ShuffleManager.java | 34 +++++++++++++++----- .../orderedgrouped/ShuffleScheduler.java | 2 +- .../impl/TestShuffleInputEventHandlerImpl.java | 14 ++++++-- 4 files changed, 43 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 246f477..79b84e8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -727,8 +727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // must be a random access structure private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000); - // Do not send any events if attempt is already failed. TaskAttemptId - private final Set<TezTaskAttemptID> failedTaskIds = Sets.newHashSet(); + // Do not send any events if attempt is failed due to INPUT_FAILED_EVENTS. + private final Set<TezTaskAttemptID> failedTaskAttemptIDs = Sets.newHashSet(); private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock(); private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock(); private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock(); @@ -3986,7 +3986,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl try { if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT || tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { - if (failedTaskIds.contains(tezEvent.getSourceInfo().getTaskAttemptID())) { + // Prevent any failed task (due to INPUT_FAILED_EVENT) sending events downstream. E.g LLAP + if (failedTaskAttemptIDs.contains(tezEvent.getSourceInfo().getTaskAttemptID())) { return; } } @@ -4004,7 +4005,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // can be obsoleted by an input failed event from the // same source edge+task eventInfo.isObsolete = true; - failedTaskIds.add(tezEvent.getSourceInfo().getTaskAttemptID()); + failedTaskAttemptIDs.add(tezEvent.getSourceInfo().getTaskAttemptID()); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index b2ff51d..8716b92 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -392,18 +392,28 @@ public class ShuffleManager implements FetcherCallback { if (input.canRetrieveInputInChunks()) { ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { - //speculative attempts or failure attempts. Fail fast here. - reportFatalError(new IOException(), input + " already exists. " - + "Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early. currentAttemptNum=" + eventInfo - .attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum=" + - input.getAttemptNumber()); - return false; + if (eventInfo.scheduledForDownload || !eventInfo.eventsProcessed.isEmpty()) { + IOException exception = new IOException("Previous event already got scheduled for " + + input + ". Previous attempt's data could have been already merged " + + "to memory/disk outputs. Killing (self) this task early." + + " currentAttemptNum=" + eventInfo.attemptNum + + ", eventsProcessed=" + eventInfo.eventsProcessed + + ", scheduledForDownload=" + eventInfo.scheduledForDownload + + ", newAttemptNum=" + input.getAttemptNumber()); + String message = "Killing self as previous attempt data could have been consumed"; + killSelf(exception, message); + return false; + } } } return true; } + void killSelf(Exception exception, String message) { + LOG.error(message, exception); + this.inputContext.killSelf(exception, message); + } + @VisibleForTesting Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { @@ -465,6 +475,12 @@ public class ShuffleManager implements FetcherCallback { if (inputHost.getNumPendingPartitions() > 0) { pendingHosts.add(inputHost); //add it to queue } + for(InputAttemptIdentifier input : pendingInputsOfOnePartition.getInputs()) { + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + if (eventInfo != null) { + eventInfo.scheduledForDownload = true; + } + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), pendingInputsOfOnePartition.getPartition(), pendingInputsOfOnePartition.getInputs()); @@ -575,6 +591,7 @@ public class ShuffleManager implements FetcherCallback { int finalEventId = -1; //0 indexed int attemptNum; String id; + boolean scheduledForDownload; // whether chunks got scheduled for download ShuffleEventInfo(InputAttemptIdentifier input) { @@ -606,7 +623,8 @@ public class ShuffleManager implements FetcherCallback { public String toString() { return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId - + ", id=" + id + ", attemptNum=" + attemptNum + "]"; + + ", id=" + id + ", attemptNum=" + attemptNum + + ", scheduledForDownload=" + scheduledForDownload + "]"; } } http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 73a6214..39f2138 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -714,7 +714,7 @@ class ShuffleScheduler { @VisibleForTesting void killSelf(Exception exception, String message) { - LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception); + LOG.error(message, exception); try { this.close(); } finally { http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 6bcbeb6..af52f90 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -224,10 +224,13 @@ public class TestShuffleInputEventHandlerImpl { PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); + // Let attemptNum 0 be scheduled. + shuffleManager.shuffleInfoEventsMap.get(expectedId2.getInputIdentifier()).scheduledForDownload = true; + //0--> 1 with spill id 1 (attemptNum 1). This should report exception dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); + verify(inputContext).killSelf(any(Throwable.class), anyString()); } /** @@ -253,10 +256,13 @@ public class TestShuffleInputEventHandlerImpl { PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); + // Let attemptNum 1 be scheduled. + shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload = true; + //Now send attemptNum 0. This should throw exception, because attempt #1 is already added dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); + verify(inputContext).killSelf(any(Throwable.class), anyString()); } /** @@ -291,11 +297,13 @@ public class TestShuffleInputEventHandlerImpl { PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected); + // Let attemptNum 0 be scheduled. + shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload = true; //Now send attemptNum 1. This should throw exception, because attempt #1 is already added dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); + verify(inputContext).killSelf(any(Throwable.class), anyString()); } private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx,
