Repository: tez Updated Branches: refs/heads/master 7f8fc7530 -> 4ec29425d
TEZ-1703. Exception handling for InputInitializer. (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ec29425 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ec29425 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ec29425 Branch: refs/heads/master Commit: 4ec29425d63decfd8de4e1528f043271cf7cb3b2 Parents: 7f8fc75 Author: Jeff Zhang <[email protected]> Authored: Fri Oct 31 09:55:22 2014 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Oct 31 09:55:22 2014 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/runtime/api/InputInitializer.java | 3 +- .../app/dag/RootInputInitializerManager.java | 30 +++- .../dag/event/VertexEventRootInputFailed.java | 7 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 4 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 52 ++++-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 178 ++++++++++++++++++- .../tez/test/TestExceptionPropagation.java | 138 +++++++++++--- 8 files changed, 362 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0b2dc65..61b380e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -92,6 +92,7 @@ ALL CHANGES: TEZ-1716. Additional ATS data for UI. TEZ-1722. DAG should be related to Application Id in ATS data. TEZ-1711. Don't cache outputSpecList in VertexImpl.getOutputSpecList(taskIndex) + TEZ-1703. Exception handling for InputInitializer. Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java index 7b22b62..d9d6517 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java @@ -101,7 +101,8 @@ public abstract class InputInitializer { * @param stateUpdate an event indicating the name of the vertex, and it's updated state. * Additional information may be available for specific events, Look at the * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} + * @throws Exception */ - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 1f7a83f..bdd3689 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -19,6 +19,8 @@ package org.apache.tez.dag.app.dag; import javax.annotation.Nullable; + +import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashMap; @@ -53,7 +55,9 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized; +import org.apache.tez.dag.app.dag.impl.AMUserCodeException; import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl; +import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.Event; @@ -275,12 +279,18 @@ public class RootInputInitializerManager { @SuppressWarnings("unchecked") @Override public void onFailure(Throwable t) { + // catch real root cause of failure, it would throw UndeclaredThrowableException + // if using UGI.doAs + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } initializer.setComplete(); LOG.info( "Failed InputInitializer for Input: " + initializer.getInput().getName() + " on vertex " + initializer.getVertexLogIdentifier()); eventHandler - .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(), t)); + .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(), + new AMUserCodeException(Source.InputInitializer,t))); } } @@ -294,6 +304,7 @@ public class RootInputInitializerManager { private final InputInitializerContext context; private final AtomicBoolean isComplete = new AtomicBoolean(false); private final String vertexLogIdentifier; + private final TezVertexID vertexId; private final StateChangeNotifier stateChangeNotifier; private final List<String> notificationRegisteredVertices = Lists.newArrayList(); private final AppContext appContext; @@ -306,6 +317,7 @@ public class RootInputInitializerManager { this.initializer = initializer; this.context = context; this.vertexLogIdentifier = vertex.getLogIdentifier(); + this.vertexId = vertex.getVertexId(); this.stateChangeNotifier = stateChangeNotifier; this.appContext = appContext; } @@ -348,6 +360,7 @@ public class RootInputInitializerManager { } } + @SuppressWarnings("unchecked") @Override public void onStateUpdated(VertexStateUpdate event) { if (isComplete()) { @@ -357,7 +370,13 @@ public class RootInputInitializerManager { " since initializer " + input.getName() + " is already complete."); } } else { - initializer.onVertexStateUpdated(event); + try { + initializer.onVertexStateUpdated(event); + } catch (Exception e) { + appContext.getEventHandler().handle( + new VertexEventRootInputFailed(vertexId, input.getName(), + new AMUserCodeException(Source.InputInitializer,e))); + } } } @@ -455,14 +474,15 @@ public class RootInputInitializerManager { sendEvents(toForwardEvents); } + @SuppressWarnings("unchecked") private void sendEvents(List<InputInitializerEvent> events) { if (events != null && !events.isEmpty()) { try { initializer.handleInputInitializerEvent(events); } catch (Exception e) { - throw new TezUncheckedException( - "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() + - " failed to process events", e); + appContext.getEventHandler().handle( + new VertexEventRootInputFailed(vertexId, input.getName(), + new AMUserCodeException(Source.InputInitializer,e))); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java index 4ab4ae9..b7701d7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java @@ -18,20 +18,21 @@ package org.apache.tez.dag.app.dag.event; +import org.apache.tez.dag.app.dag.impl.AMUserCodeException; import org.apache.tez.dag.records.TezVertexID; public class VertexEventRootInputFailed extends VertexEvent { private final String inputName; - private final Throwable error; + private final AMUserCodeException error; - public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, Throwable error) { + public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, AMUserCodeException error) { super(vertexId, VertexEventType.V_ROOT_INPUT_FAILED); this.inputName = inputName; this.error = error; } - public Throwable getError() { + public AMUserCodeException getError() { return this.error; } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 6dccf3a..cddcbd5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1686,9 +1686,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } else if (vertexEvent.getVertexState() == VertexState.FAILED) { job.enactKill( - DAGTerminationCause.VERTEX_FAILURE, - vertexEvent.getVertexTerminationCause() == null ? VertexTerminationCause.OTHER_VERTEX_FAILURE - : vertexEvent.getVertexTerminationCause()); + DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE); job.vertexFailed(vertex); forceTransitionToKillWait = true; } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c182810..4a88949 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -335,7 +335,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EnumSet.of(VertexState.INITED, VertexState.FAILED), VertexEventType.V_READY_TO_INIT, new VertexInitializedTransition()) - .addTransition(VertexState.INITIALIZING, VertexState.FAILED, + .addTransition(VertexState.INITIALIZING, + EnumSet.of(VertexState.FAILED), VertexEventType.V_ROOT_INPUT_FAILED, new RootInputInitFailedTransition()) .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING, @@ -367,6 +368,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Transitions from INITED state // SOURCE_VERTEX_STARTED - for sources which determine parallelism, // they must complete before this vertex can start. + .addTransition(VertexState.INITED, + EnumSet.of(VertexState.FAILED), + VertexEventType.V_ROOT_INPUT_FAILED, + new RootInputInitFailedTransition()) .addTransition (VertexState.INITED, EnumSet.of(VertexState.INITED, VertexState.ERROR), @@ -399,6 +404,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, INTERNAL_ERROR_TRANSITION) // Transitions from RUNNING state + .addTransition(VertexState.RUNNING, + EnumSet.of(VertexState.TERMINATING), + VertexEventType.V_ROOT_INPUT_FAILED, + new RootInputInitFailedTransition()) .addTransition(VertexState.RUNNING, VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_COMPLETED, TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) @@ -451,6 +460,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Ignore-able events .addTransition(VertexState.TERMINATING, VertexState.TERMINATING, EnumSet.of(VertexEventType.V_TERMINATE, + VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, @@ -483,6 +493,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new TaskCompletedAfterVertexSuccessTransition()) .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, EnumSet.of(VertexEventType.V_TERMINATE, + VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, // after we are done reruns of source tasks should not affect // us. These reruns may be triggered by other consumer vertices. @@ -501,6 +512,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Ignore-able events .addTransition(VertexState.FAILED, VertexState.FAILED, EnumSet.of(VertexEventType.V_TERMINATE, + VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_START, @@ -522,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Ignore-able events .addTransition(VertexState.KILLED, VertexState.KILLED, EnumSet.of(VertexEventType.V_TERMINATE, + VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_INIT, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, @@ -541,6 +554,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState.ERROR, VertexState.ERROR, EnumSet.of(VertexEventType.V_INIT, + VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, @@ -1654,7 +1668,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) { vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to VertexManager failed. " + String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. " + + "failedTasks:" + + vertex.failedTaskCount + + " killedTasks:" + + vertex.killedTaskCount; + LOG.info(diagnosticMsg); + vertex.abortVertex(State.FAILED); + return vertex.finished(VertexState.FAILED); + } + else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) { + vertex.setFinishTime(); + String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. " + "failedTasks:" + vertex.failedTaskCount + " killedTasks:" @@ -1665,7 +1690,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } else { //should never occur - throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex" + throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier + ", failedTaskCount=" + vertex.failedTaskCount + ", killedTaskCount=" + vertex.killedTaskCount + ", successfulTaskCount=" + vertex.succeededTaskCount @@ -3202,19 +3227,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } private static class RootInputInitFailedTransition implements - SingleArcTransition<VertexImpl, VertexEvent> { + MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @Override - public void transition(VertexImpl vertex, VertexEvent event) { + public VertexState transition(VertexImpl vertex, VertexEvent event) { VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event; String msg = "Vertex Input: " + fe.getInputName() + " initializer failed, vertex=" + vertex.getLogIdentifier(); - if (fe.getError() != null) { - msg = msg + ExceptionUtils.getStackTrace(fe.getError()); + LOG.error(msg, fe.getError()); + if (vertex.getState() == VertexState.RUNNING) { + vertex.addDiagnostic(msg + + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); + vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, + TaskTerminationCause.AM_USERCODE_FAILURE); + return VertexState.TERMINATING; + } else { + vertex.finished(VertexState.FAILED, + VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg + + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause())); + return VertexState.FAILED; } - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index fdf0e07..ef2c7bd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -125,6 +125,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; +import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; @@ -4647,7 +4648,8 @@ public class TestVertexImpl { super.runInputInitializers(inputs); eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs .get(0).getName(), - new RuntimeException("MockInitializerFailed"))); + new AMUserCodeException(Source.InputInitializer, + new RuntimeException("MockInitializerFailed")))); dispatcher.await(); } @@ -4969,9 +4971,164 @@ public class TestVertexImpl { initVertex(v1); String diagnostics = StringUtils.join(v1.getDiagnostics(), ","); assertTrue(diagnostics.contains(IIExceptionLocation.Initialize.name())); + Assert.assertEquals(VertexState.FAILED, v1.getState()); Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException { + useCustomInitializer = true; + customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2); + EventHandlingRootInputInitializer initializer = + (EventHandlingRootInputInitializer) customInitializer; + setupPreDagCreation(); + dagPlan = createDAGPlanWithIIException(); + setupPostDagCreation(); + + VertexImplWithRunningInputInitializer v1 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); + // INIT_SUCCEEDED followed by INIT_FAILURE + initVertex(v1); + dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized( + v1.getVertexId(), "input1", null)); + dispatcher.await(); + + String diagnostics = StringUtils.join(v1.getDiagnostics(), ","); + assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name())); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException { + useCustomInitializer = true; + customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2); + EventHandlingRootInputInitializer initializer = + (EventHandlingRootInputInitializer) customInitializer; + setupPreDagCreation(); + dagPlan = createDAGPlanWithIIException(); + setupPostDagCreation(); + + VertexImplWithRunningInputInitializer v1 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); + initVertex(v1); + dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized( + v1.getVertexId(), "input1", null)); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), + VertexEventType.V_START)); + dispatcher.await(); + + String diagnostics = StringUtils.join(v1.getDiagnostics(), ","); + assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name())); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException { + useCustomInitializer = true; + customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent); + EventHandlingRootInputInitializer initializer = + (EventHandlingRootInputInitializer) customInitializer; + setupPreDagCreation(); + dagPlan = createDAGPlanWithRunningInitializer(); + setupPostDagCreation(); + + VertexImplWithRunningInputInitializer v1 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); + VertexImplWithRunningInputInitializer v2 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex2"); + + initVertex(v1); + startVertex(v1); + Assert.assertEquals(VertexState.RUNNING, v1.getState()); + Assert.assertEquals(VertexState.INITIALIZING, v2.getState()); + dispatcher.await(); + + // Wait for the initializer to be invoked - which may be a separate thread. + while (!initializer.initStarted.get()) { + Thread.sleep(10); + } + Assert.assertFalse(initializer.eventReceived.get()); + Assert.assertFalse(initializer.initComplete.get()); + + // Signal the initializer by sending an event - via vertex1 + InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null); + // Create taskId and taskAttemptId for the single task that exists in vertex1 + TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0); + TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0); + TezEvent tezEvent = new TezEvent(event, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1)); + + // at least one task attempt is succeed, otherwise input initialize events won't been handled. + dispatcher.getEventHandler().handle(new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED)); + dispatcher.getEventHandler().handle(new TaskEventTAUpdate(ta0_t0_v1, TaskEventType.T_ATTEMPT_SUCCEEDED)); + dispatcher.getEventHandler() + .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent))); + dispatcher.await(); + + // it would cause v2 fail as its II throw exception in handleInputInitializerEvent + String diagnostics = StringUtils.join(v2.getDiagnostics(), ","); + assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name())); + Assert.assertEquals(VertexState.FAILED, v2.getState()); + Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause()); + } + + @Test(timeout = 5000) + public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException { + useCustomInitializer = true; + customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated); + EventHandlingRootInputInitializer initializer = + (EventHandlingRootInputInitializer) customInitializer; + setupPreDagCreation(); + dagPlan = createDAGPlanWithRunningInitializer(); + setupPostDagCreation(); + + VertexImplWithRunningInputInitializer v1 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); + VertexImplWithRunningInputInitializer v2 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex2"); + + initVertex(v1); + startVertex(v1); // v2 would get the state update from v1 + Assert.assertEquals(VertexState.RUNNING, v1.getState()); + Assert.assertEquals(VertexState.FAILED, v2.getState()); + String diagnostics = StringUtils.join(v2.getDiagnostics(), ","); + assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name())); + Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException { + useCustomInitializer = true; + customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated); + EventHandlingRootInputInitializer initializer = + (EventHandlingRootInputInitializer) customInitializer; + setupPreDagCreation(); + dagPlan = createDAGPlanWithRunningInitializer(); + setupPostDagCreation(); + + VertexImplWithRunningInputInitializer v1 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex1"); + VertexImplWithRunningInputInitializer v2 = + (VertexImplWithRunningInputInitializer) vertices.get("vertex2"); + + initVertex(v1); + startVertex(v1); // v2 would get the state update from v1 + // it should be OK receive INIT_SUCCEEDED event after INIT_FAILED event + dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized( + v2.getVertexId(), "input1", null)); + + Assert.assertEquals(VertexState.RUNNING, v1.getState()); + Assert.assertEquals(VertexState.FAILED, v2.getState()); + String diagnostics = StringUtils.join(v2.getDiagnostics(), ","); + assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name())); + Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause()); + } @InterfaceAudience.Private public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin { @@ -5119,6 +5276,9 @@ public class TestVertexImpl { public static enum IIExceptionLocation { Initialize, + Initialize2, // for test case that InputInitFailed after InputInitSucceeded + HandleInputInitializerEvent, + OnVertexStateUpdated } @InterfaceAudience.Private @@ -5154,6 +5314,16 @@ public class TestVertexImpl { if (exLocation == IIExceptionLocation.Initialize) { throw new Exception(exLocation.name()); } + if (exLocation == IIExceptionLocation.Initialize2) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // InputInitializerManager is been shutdown if Initialized succeeded, + // catch the exception and throw the exception to simulate the case that + // init failure after init succeeded + throw new Exception(exLocation.name()); + } + } context.registerForVertexStateUpdates("vertex1", null); initStarted.set(true); lock.lock(); @@ -5175,6 +5345,9 @@ public class TestVertexImpl { @Override public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + if (exLocation == IIExceptionLocation.HandleInputInitializerEvent) { + throw new Exception(exLocation.name()); + } initializerEvents.addAll(events); if (initializerEvents.size() == numExpectedEvents) { eventReceived.set(true); @@ -5197,6 +5370,9 @@ public class TestVertexImpl { } public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + if (exLocation == IIExceptionLocation.OnVertexStateUpdated) { + throw new RuntimeException(exLocation.name()); + } stateUpdates.add(stateUpdate); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index eef6ab3..0175d7b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; @@ -62,9 +63,10 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager; import org.apache.tez.dag.app.dag.impl.RootInputVertexManager; -import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager; -import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; @@ -84,8 +86,8 @@ import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.processor.SleepProcessor; -import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; +import org.apache.tez.test.TestAMRecovery.DoNothingProcessor; +import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput; import org.junit.Test; import com.google.common.collect.Lists; @@ -210,7 +212,7 @@ public class TestExceptionPropagation { * @throws Exception * */ - @Test(timeout = 120000) + @Test(timeout = 180000) public void testExceptionPropagationSession() throws Exception { try { startSessionClient(); @@ -309,6 +311,10 @@ public class TestExceptionPropagation { EM_RouteInputErrorEventToSource, // Not Supported yet // EM_RouteInputSourceTaskFailedEventToDestination, + + // II + II_Initialize, II_HandleInputInitializerEvents, II_OnVertexStateUpdated + } /** @@ -333,22 +339,31 @@ public class TestExceptionPropagation { v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload)); Vertex v2 = - Vertex.create("v2", - ProcessorDescriptor.create(SleepProcessor.class.getName()) - .setUserPayload(new SleepProcessorConfig(3).toUserPayload()) - , 1); - v2.setVertexManagerPlugin(ShuffleVertexManagerWithException.getVMDesc(exLocation)); + Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1); + v2.addDataSource("input2", + DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()), + InputInitializerWithException2.getIIDesc(payload), null)); dag.addVertex(v1) - .addVertex(v2) - .addEdge(Edge.create(v1, v2, EdgeProperty.create( - EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName()) - .setUserPayload(payload), - DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, - OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload)))); + .addVertex(v2); + if (exLocation.name().startsWith("EM_")) { + dag.addEdge(Edge.create(v1, v2, EdgeProperty.create( + EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName()) + .setUserPayload(payload), + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload)))); + } else { + // set Customized VertexManager here, it can't been used for CustomEdge + v2.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exLocation)); + dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.ONE_TO_ONE, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload)))); + } + return dag; } + // InputInitializer of vertex1 public static class InputInitializerWithException extends InputInitializer { private ExceptionLocation exLocation; @@ -380,7 +395,65 @@ public class TestExceptionPropagation { } } - // input of vertex2 + // InputInitializer of vertex2 + public static class InputInitializerWithException2 extends InputInitializer { + + private ExceptionLocation exLocation; + private Object condition = new Object(); + + public InputInitializerWithException2( + InputInitializerContext initializerContext) { + super(initializerContext); + this.exLocation = + ExceptionLocation.valueOf(new String(getContext().getUserPayload() + .deepCopyAsArray())); + } + + @Override + public List<Event> initialize() throws Exception { + if (exLocation == ExceptionLocation.II_Initialize) { + throw new Exception(exLocation.name()); + } + if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) { + getContext().registerForVertexStateUpdates("v1", null); + } + + if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents + || exLocation == ExceptionLocation.II_OnVertexStateUpdated) { + // wait for handleInputInitializerEvent() and onVertexStateUpdated() is invoked + synchronized (condition) { + condition.wait(); + } + } + + return null; + } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) + throws Exception { + if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents) { + throw new RuntimeException(exLocation.name()); + } + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) + throws Exception { + if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) { + throw new Exception(exLocation.name()); + } + super.onVertexStateUpdated(stateUpdate); + } + + public static InputInitializerDescriptor getIIDesc(UserPayload payload) { + return InputInitializerDescriptor.create( + InputInitializerWithException2.class.getName()) + .setUserPayload(payload); + } + } + + // Input of vertex2 public static class InputWithException extends AbstractLogicalInput { private ExceptionLocation exLocation; @@ -436,10 +509,12 @@ public class TestExceptionPropagation { getContext().requestInitialMemory(0l, null); // mandatory call if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) { throw new Exception(this.exLocation.name()); - } else if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource - || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) { - Event errorEvent = InputReadErrorEvent.create("read error", 0, 0); - return Lists.newArrayList(errorEvent); + } else if ( getContext().getSourceVertexName().equals("v1")) { + if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource + || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) { + Event errorEvent = InputReadErrorEvent.create("read error", 0, 0); + return Lists.newArrayList(errorEvent); + } } return null; } @@ -450,7 +525,7 @@ public class TestExceptionPropagation { } } - // output of vertex1 + // Output of vertex1 public static class OutputWithException extends AbstractLogicalOutput { private ExceptionLocation exLocation; @@ -497,8 +572,12 @@ public class TestExceptionPropagation { List<Event> events = new ArrayList<Event>(); events.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0]))); return events; - } - else { + } else if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) { + // send InputInitliazer to InputInitializer of v2 + List<Event> events = new ArrayList<Event>(); + events.add(InputInitializerEvent.create("v2", "input2", ByteBuffer.wrap(new byte[0]))); + return events; + } else { return null; } } @@ -576,6 +655,7 @@ public class TestExceptionPropagation { } } + // VertexManager of vertex1 public static class RootInputVertexManagerWithException extends RootInputVertexManager { private ExceptionLocation exLocation; @@ -618,12 +698,13 @@ public class TestExceptionPropagation { } } - public static class ShuffleVertexManagerWithException extends ShuffleVertexManager { + // VertexManager of vertex2 + public static class InputReadyVertexManagerWithException extends InputReadyVertexManager { private ExceptionLocation exLocation; private static final String Test_ExceptionLocation = "Test.ExceptionLocation"; - public ShuffleVertexManagerWithException(VertexManagerPluginContext context) { + public InputReadyVertexManagerWithException(VertexManagerPluginContext context) { super(context); } @@ -666,12 +747,13 @@ public class TestExceptionPropagation { Configuration conf = new Configuration(); conf.set(Test_ExceptionLocation, exLocation.name()); UserPayload payload = TezUtils.createUserPayloadFromConf(conf); - return VertexManagerPluginDescriptor.create(ShuffleVertexManagerWithException.class.getName()) + return VertexManagerPluginDescriptor.create(InputReadyVertexManagerWithException.class.getName()) .setUserPayload(payload); } } - public static class CustomEdgeManager extends ScatterGatherEdgeManager { + // EdgeManager for edge linking vertex1 and vertex2 + public static class CustomEdgeManager extends OneToOneEdgeManager { private ExceptionLocation exLocation;
