Repository: tez Updated Branches: refs/heads/master 24b872a7f -> 2e66f3cb2
TEZ-3817. DAGs can hang after more than one uncaught Exception during doTransition. (kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e66f3cb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e66f3cb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e66f3cb Branch: refs/heads/master Commit: 2e66f3cb2ef082889551f6a0830c7014317d9680 Parents: 24b872a Author: Kuhu Shukla <[email protected]> Authored: Mon Apr 23 16:52:55 2018 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Mon Apr 23 16:52:55 2018 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 63 ++++++++++---------- .../tez/dag/app/dag/impl/TestDAGImpl.java | 48 +++++++++++++++ 2 files changed, 81 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2e66f3cb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 0a775a6..ecd8d17 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1388,41 +1388,44 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } private DAGState finished(DAGState finalState) { - if (finishTime == 0) { - setFinishTime(); - } - entityUpdateTracker.stop(); - - boolean recoveryError = false; - - // update cpu time counters before finishing the dag - updateCpuCounters(); - TezCounters counters = null; + boolean dagError = false; try { - counters = getAllCounters(); - } catch (LimitExceededException e) { - addDiagnostic("Counters limit exceeded: " + e.getMessage()); - finalState = DAGState.FAILED; - } + if (finishTime == 0) { + setFinishTime(); + } + entityUpdateTracker.stop(); - try { - if (finalState == DAGState.SUCCEEDED) { - logJobHistoryFinishedEvent(counters); - } else { - logJobHistoryUnsuccesfulEvent(finalState, counters); + // update cpu time counters before finishing the dag + updateCpuCounters(); + TezCounters counters = null; + try { + counters = getAllCounters(); + } catch (LimitExceededException e) { + addDiagnostic("Counters limit exceeded: " + e.getMessage()); + finalState = DAGState.FAILED; } - } catch (IOException e) { - LOG.warn("Failed to persist recovery event for DAG completion" - + ", dagId=" + dagId - + ", finalState=" + finalState); - recoveryError = true; - } - if (finalState != DAGState.SUCCEEDED) { - abortOutputs(); - } + try { + if (finalState == DAGState.SUCCEEDED) { + logJobHistoryFinishedEvent(counters); + } else { + logJobHistoryUnsuccesfulEvent(finalState, counters); + } + } catch (IOException e) { + LOG.warn("Failed to persist recovery event for DAG completion" + + ", dagId=" + dagId + + ", finalState=" + finalState, e); + dagError = true; + } - if (recoveryError) { + if (finalState != DAGState.SUCCEEDED) { + abortOutputs(); + } + } catch (Exception e) { + dagError = true; + LOG.warn("Encountered exception while DAG finish", e); + } + if (dagError) { eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR)); } else { eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState)); http://git-wip-us.apache.org/repos/asf/tez/blob/2e66f3cb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 966b464..c0506de 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -111,6 +112,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventStartDag; +import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; @@ -140,6 +142,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.state.StateMachineTez; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1929,6 +1932,51 @@ public class TestDAGImpl { Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } + @Test (timeout = 5000L) + @SuppressWarnings("unchecked") + public void testDAGHang() throws Exception { + conf.setBoolean( + TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + dag = Mockito.spy(new DAGImpl(dagId, conf, dagPlan, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, + fsTokens, clock, "user", thh, appContext)); + StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> spyStateMachine = + Mockito.spy(new StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl>( + dag.stateMachineFactory.make(dag), dag)); + when(dag.getStateMachine()).thenReturn(spyStateMachine); + dag.entityUpdateTracker = new StateChangeNotifierForTest(dag); + doReturn(dag).when(appContext).getCurrentDAG(); + DAGImpl.OutputKey outputKey = Mockito.mock(DAGImpl.OutputKey.class); + ListenableFuture future = Mockito.mock(ListenableFuture.class); + dag.commitFutures.put(outputKey, future); + initDAG(dag); + startDAG(dag); + dispatcher.await(); + + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED)); + dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( + TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(DAGState.COMMITTING, dag.getState()); + DAGEventCommitCompleted dagEvent = new DAGEventCommitCompleted( + dagId, outputKey, false , new RuntimeException("test")); + doThrow(new RuntimeException("test")).when( + dag).logJobHistoryUnsuccesfulEvent(any(DAGState.class), any(TezCounters.class)); + dag.handle(dagEvent); + dispatcher.await(); + Assert.assertTrue("DAG did not terminate!", dag.getInternalState() == DAGState.FAILED); + } + @Test(timeout = 5000) public void testDAGKillVertexSuccessAfterTerminated() { _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
