Repository: tez Updated Branches: refs/heads/master ebf3fb133 -> 7b30785bd
TEZ-3700. Consumer attempt should kill itself instead of failing during validation checks with final merge avoidance (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b30785b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b30785b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b30785b Branch: refs/heads/master Commit: 7b30785bdfee141e7a2dce80749f756fb5ec2d06 Parents: ebf3fb1 Author: Rajesh Balamohan <[email protected]> Authored: Wed May 3 06:12:51 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed May 3 06:12:51 2017 +0530 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 9 +++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 15 ++++ .../orderedgrouped/ShuffleScheduler.java | 83 +++++++++++++++----- ...tShuffleInputEventHandlerOrderedGrouped.java | 44 ++++++++++- 4 files changed, 130 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ab17fe4..246f477 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -727,6 +727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // must be a random access structure private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000); + // Do not send any events if attempt is already failed. TaskAttemptId + private final Set<TezTaskAttemptID> failedTaskIds = Sets.newHashSet(); private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock(); private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock(); private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock(); @@ -3982,6 +3984,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) { onDemandRouteEventsWriteLock.lock(); try { + if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT || + tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { + if (failedTaskIds.contains(tezEvent.getSourceInfo().getTaskAttemptID())) { + return; + } + } onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex)); if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) { for (EventInfo eventInfo : onDemandRouteEvents) { @@ -3996,6 +4004,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // can be obsoleted by an input failed event from the // same source edge+task eventInfo.isObsolete = true; + failedTaskIds.add(tezEvent.getSourceInfo().getTaskAttemptID()); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index bc06fd0..76ccf91 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -3052,6 +3052,21 @@ public class TestVertexImpl { Assert.assertEquals(12, fromEventId); Assert.assertEquals(1, eventInfo.getEvents().size()); Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType()); + + // Let failed task send more event + for (int i=11; i<14; ++i) { + v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList( + new TezEvent(DataMovementEvent.create(0, null), + new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId))))); + } + dispatcher.await(); + // 11 events + 1 INPUT_FAILED_EVENT. + // Events sent out later by failed tasks should not be available. + Assert.assertEquals(12, v4.getOnDemandRouteEvents().size()); + + fromEventId = 0; + eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100); + Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType()); } @Test (timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 953c73e..73a6214 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -54,7 +55,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; @@ -405,7 +405,7 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); - pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting @@ -520,6 +520,7 @@ class ShuffleScheduler { int finalEventId = -1; //0 indexed int attemptNum; String id; + boolean scheduledForDownload; // whether chunks got scheduled for download (getMapHost) ShuffleEventInfo(InputAttemptIdentifier input) { @@ -547,7 +548,8 @@ class ShuffleScheduler { public String toString() { return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId - + ", id=" + id + ", attemptNum=" + attemptNum + "]"; + + ", id=" + id + ", attemptNum=" + attemptNum + + ", scheduledForDownload=" + scheduledForDownload + "]"; } } @@ -677,12 +679,29 @@ class ShuffleScheduler { if (input.canRetrieveInputInChunks()) { ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier()); if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { - reportExceptionForInput(new IOException("Previous event already got scheduled for " + - input + ". Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early. currentAttemptNum=" - + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed - + ", newAttemptNum=" + input.getAttemptNumber())); - return false; + /* + * Check if current attempt has been scheduled for download. + * e.g currentAttemptNum=0, eventsProcessed={}, newAttemptNum=1 + * If nothing is scheduled in current attempt and no events are processed + * (i.e copySucceeded), we can ignore current attempt and start processing the new + * attempt (e.g LLAP). + */ + if (eventInfo.scheduledForDownload || !eventInfo.eventsProcessed.isEmpty()) { + IOException exception = new IOException("Previous event already got scheduled for " + + input + ". Previous attempt's data could have been already merged " + + "to memory/disk outputs. Killing (self) this task early." + + " currentAttemptNum=" + eventInfo.attemptNum + + ", eventsProcessed=" + eventInfo.eventsProcessed + + ", scheduledForDownload=" + eventInfo.scheduledForDownload + + ", newAttemptNum=" + input.getAttemptNumber()); + String message = "Killing self as previous attempt data could have been consumed"; + killSelf(exception, message); + return false; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring current attempt=" + eventInfo.attemptNum + " with eventInfo=" + + eventInfo.toString() + "and processing new attempt=" + input.getAttemptNumber()); + } } if (eventInfo == null) { @@ -694,9 +713,13 @@ class ShuffleScheduler { } @VisibleForTesting - void reportExceptionForInput(Exception exception) { + void killSelf(Exception exception, String message) { LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception); - exceptionReporter.reportException(exception); + try { + this.close(); + } finally { + this.inputContext.killSelf(exception, message); + } } private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0); @@ -1048,18 +1071,33 @@ class ShuffleScheduler { } } - public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) { + public void obsoleteInput(InputAttemptIdentifier srcAttempt) { // The incoming srcAttempt does not contain a path component. LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt); - if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { - //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated). - //Fail fast here. - exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(srcAttempt.getInputIdentifier()); + + //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated). + //Fail fast here. + if (eventInfo != null) { + // In case this we haven't started downloading it, get rid of it. + if (eventInfo.eventsProcessed.isEmpty() && !eventInfo.scheduledForDownload) { + // obsoleted anyways; no point tracking if nothing is started + pipelinedShuffleInfoEventsMap.remove(srcAttempt.getInputIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + eventInfo + " from tracking"); + } + return; + } + IOException exception = new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " - + "to memory/disk outputs. Failing the fetch early.")); + + "to memory/disk outputs. Failing the fetch early. eventInfo:" + eventInfo.toString()); + String message = "Got obsolete event. Killing self as attempt's data could have been consumed"; + killSelf(exception, message); return; } - obsoleteInputs.add(srcAttempt); + synchronized (this) { + obsoleteInputs.add(srcAttempt); + } } public synchronized void putBackKnownMapOutput(MapHost host, @@ -1102,7 +1140,7 @@ class ShuffleScheduler { return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId)); } - private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { + private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) { return (!obsoleteInputs.contains(id) && !isInputFinished(id.getInputIdentifier())); } @@ -1178,6 +1216,13 @@ class ShuffleScheduler { if (includedMaps++ >= maxTaskOutputAtOnce) { host.addKnownMap(inputAttemptIdentifier); } else { + if (inputAttemptIdentifier.canRetrieveInputInChunks()) { + ShuffleEventInfo shuffleEventInfo = + pipelinedShuffleInfoEventsMap.get(inputAttemptIdentifier.getInputIdentifier()); + if (shuffleEventInfo != null) { + shuffleEventInfo.scheduledForDownload = true; + } + } result.add(inputAttemptIdentifier); } } http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 26aa298..695a307 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -232,16 +232,56 @@ public class TestShuffleInputEventHandlerOrderedGrouped { assertTrue("Shuffle info events should not be empty for pipelined shuffle", !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); + int valuesInMapLocations = scheduler.mapLocations.values().size(); + assertTrue("Maplocations should have values. current size: " + valuesInMapLocations, + valuesInMapLocations > 0); + + // start scheduling for download + scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next()); + //Attempt #0 comes up. When processing this, it should report exception attemptNum = 0; inputIdx = 1; Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); handler.handleEvents(Collections.singletonList(dme2)); - InputAttemptIdentifier id2 = + // task should issue kill request + verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class)); + } + + @Test (timeout = 5000) + public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException { + //Process attempt #1 first + int attemptNum = 1; + int inputIdx = 1; + + Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); + handler.handleEvents(Collections.singletonList(dme1)); + + InputAttemptIdentifier id1 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); - verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class)); + + verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1)); + assertTrue("Shuffle info events should not be empty for pipelined shuffle", + !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); + + int valuesInMapLocations = scheduler.mapLocations.values().size(); + assertTrue("Maplocations should have values. current size: " + valuesInMapLocations, + valuesInMapLocations > 0); + + // start scheduling for download. Sets up scheduledForDownload in eventInfo. + scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next()); + + // send input failed event. + List<Event> events = new LinkedList<Event>(); + int targetIdx = 1; + InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0); + events.add(failedEvent); + handler.handleEvents(events); + + // task should issue kill request, as inputs are scheduled for download already. + verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class)); } @Test(timeout = 5000)
