http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 28670ff..fd56495 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventStartDag; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; @@ -711,11 +712,11 @@ public class TestCommit { TaskState.SUCCEEDED)); Assert.assertEquals(VertexState.COMMITTING, v1.getState()); // kill dag which will trigger the vertex killed event - dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + dag.handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, null)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v1.getState()); Assert.assertTrue(v1.commitFutures.isEmpty()); - Assert.assertEquals(VertexTerminationCause.DAG_KILL, + Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v1.getTerminationCause()); Assert.assertEquals(DAGState.KILLED, dag.getState()); Assert @@ -1514,10 +1515,20 @@ public class TestCommit { // Assert.assertEquals(0, v3OutputCommitter.abortCounter); } - // Kill dag while it is in COMMITTING in the case of - // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true + @Test(timeout = 5000) public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception { + _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testServiceErrorWhileCommitting1_OnDAGSuccess() throws Exception { + _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + // Kill dag while it is in COMMITTING in the case of + // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true + private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan(true, true)); @@ -1534,14 +1545,14 @@ public class TestCommit { v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), TaskState.SUCCEEDED)); waitUntil(dag, DAGState.COMMITTING); - dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); - waitUntil(dag, DAGState.KILLED); + dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null)); + waitUntil(dag, terminationCause.getFinishedState()); Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); Assert - .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + .assertEquals(terminationCause, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); @@ -1569,10 +1580,20 @@ public class TestCommit { Assert.assertEquals(1, v3OutputCommitter.abortCounter); } - // Kill dag while it is in COMMITTING in the case of - // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false + @Test(timeout = 5000) public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception { + _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testServiceErrorWhileCommitting1_OnVertexSuccess() throws Exception { + _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + // Kill dag while it is in COMMITTING in the case of + // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false + private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); setupDAG(createDAGPlan(true, true)); @@ -1596,15 +1617,15 @@ public class TestCommit { v3OutputCommitter.unblockCommit(); // dag go to COMMITTING due to the pending commit of v12Out waitUntil(dag, DAGState.COMMITTING); - dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); - waitUntil(dag, DAGState.KILLED); + dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null)); + waitUntil(dag, terminationCause.getFinishedState()); Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); Assert - .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + .assertEquals(terminationCause, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); @@ -1631,9 +1652,18 @@ public class TestCommit { Assert.assertEquals(1, v3OutputCommitter.abortCounter); } - // DAG killed while dag is still in RUNNING and vertex is in COMMITTING @Test(timeout = 5000) public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception { + _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testServiceErrorWhileRunning_OnVertexSuccess() throws Exception { + _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + // DAG killed while dag is still in RUNNING and vertex is in COMMITTING + private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); setupDAG(createDAGPlan(true, true)); @@ -1652,17 +1682,17 @@ public class TestCommit { Assert.assertEquals(VertexState.COMMITTING, v3.getState()); // dag is still in RUNNING because v3 has not completed Assert.assertEquals(DAGState.RUNNING, dag.getState()); - dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); - waitUntil(dag, DAGState.KILLED); + dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null)); + waitUntil(dag, terminationCause.getFinishedState()); Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); Assert.assertEquals(VertexState.KILLED, v3.getState()); - Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause()); + Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause()); Assert.assertTrue(v3.commitFutures.isEmpty()); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); Assert - .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + .assertEquals(terminationCause, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); // commit uv12 may not have started, so can't verify the VertexGroupCommitStartedEvent historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); @@ -1903,10 +1933,19 @@ public class TestCommit { Assert.assertEquals(1, v3OutputCommitter.abortCounter); } - // test commit will be canceled no matter it is started or still in the threadpool - // ControlledThreadPoolExecutor is used for to not schedule the commits @Test(timeout = 5000) public void testCommitCanceled_OnDAGSuccess() throws Exception { + _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testCommitCanceled_OnDAGSuccess2() throws Exception { + _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + // test commit will be canceled no matter it is started or still in the threadpool + // ControlledThreadPoolExecutor is used for to not schedule the commits + private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan(true, true)); @@ -1931,10 +1970,10 @@ public class TestCommit { // mean the commits have been submitted to ThreadPool Assert.assertEquals(2, dag.commitFutures.size()); - dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); - waitUntil(dag, DAGState.KILLED); + dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null)); + waitUntil(dag, terminationCause.getFinishedState()); - Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertEquals(terminationCause, dag.getTerminationCause()); // mean the commits have been canceled Assert.assertTrue(dag.commitFutures.isEmpty()); historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 2158368..480e3cf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.hadoop.shim.HadoopShim; import org.slf4j.Logger; @@ -1641,8 +1641,7 @@ public class TestDAGImpl { startDAG(dag); dispatcher.await(); - dispatcher.getEventHandler().handle( - new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null)); dispatcher.await(); Assert.assertEquals(DAGState.KILLED, dag.getState()); @@ -1654,9 +1653,18 @@ public class TestDAGImpl { } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testKillRunningDAG() { + _testTerminateRunningDAG(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testServiceErrorRunningDAG() { + _testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + @SuppressWarnings("unchecked") + private void _testTerminateRunningDAG(DAGTerminationCause terminationCause) { initDAG(dag); startDAG(dag); dispatcher.await(); @@ -1674,7 +1682,7 @@ public class TestDAGImpl { Assert.assertEquals(VertexState.SUCCEEDED, v0.getState()); Assert.assertEquals(VertexState.RUNNING, v1.getState()); - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null)); dispatcher.await(); Assert.assertEquals(DAGState.TERMINATING, dag.getState()); @@ -1817,7 +1825,7 @@ public class TestDAGImpl { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 5), VertexState.FAILED)); } else if (testState == DAGStatus.State.KILLED) { - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null)); } else if (testState == DAGStatus.State.ERROR) { dispatcher.getEventHandler().handle(new DAGEventStartDag(dagId, new LinkedList<URL>())); } else { @@ -1871,11 +1879,21 @@ public class TestDAGImpl { } } + + @Test(timeout = 5000) + public void testDAGKill() { + _testDAGTerminate(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testDAGServiceError() { + _testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + // Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be // marked as KILLED. @SuppressWarnings("unchecked") - @Test(timeout = 5000) - public void testDAGKill() { + private void _testDAGTerminate(DAGTerminationCause terminationCause) { initDAG(dag); startDAG(dag); dispatcher.await(); @@ -1887,10 +1905,10 @@ public class TestDAGImpl { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null)); dispatcher.await(); - Assert.assertEquals(DAGState.KILLED, dag.getState()); - Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); + Assert.assertEquals(terminationCause, dag.getTerminationCause()); Assert.assertEquals(2, dag.getSuccessfulVertices()); int killedCount = 0; @@ -1902,16 +1920,25 @@ public class TestDAGImpl { Assert.assertEquals(4, killedCount); for (Vertex v : dag.getVertices().values()) { - Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause()); + Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause()); } Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } + @Test(timeout = 5000) + public void testDAGKillVertexSuccessAfterTerminated() { + _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testDAGServiceErrorVertexSuccessAfterTerminated() { + _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + // Vertices succeed after a DAG kill has been processed. Should be ignored. @SuppressWarnings("unchecked") - @Test(timeout = 5000) - public void testDAGKillVertexSuccessAfterKill() { + private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause terminationCause) { initDAG(dag); startDAG(dag); dispatcher.await(); @@ -1923,10 +1950,10 @@ public class TestDAGImpl { dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED)); - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null)); dispatcher.await(); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); // Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored. for (int i = 2; i < 6; ++i) { @@ -1943,18 +1970,27 @@ public class TestDAGImpl { } Assert.assertEquals(4, killedCount); - Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertEquals(terminationCause, dag.getTerminationCause()); Assert.assertEquals(2, dag.getSuccessfulVertices()); for (Vertex v : dag.getVertices().values()) { - Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause()); + Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause()); } Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } - // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testDAGKillPending() { + _testDAGKillPending(DAGTerminationCause.DAG_KILL); + } + + @Test(timeout = 5000) + public void testDAGServiceErrorPending() { + _testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR); + } + + // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED + @SuppressWarnings("unchecked") + private void _testDAGKillPending(DAGTerminationCause terminationCause) { initDAG(dag); startDAG(dag); dispatcher.await(); @@ -1972,17 +2008,17 @@ public class TestDAGImpl { TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED)); } dispatcher.await(); - dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null)); dispatcher.await(); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); dispatcher.getEventHandler().handle(new DAGEventVertexCompleted( TezVertexID.getInstance(dagId, 5), VertexState.KILLED)); dispatcher.await(); - Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert.assertEquals(terminationCause.getFinishedState(), dag.getState()); Assert.assertEquals(5, dag.getSuccessfulVertices()); Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), - VertexTerminationCause.DAG_KILL); + VertexTerminationCause.DAG_TERMINATED); Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents); } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 986f64d..659d099 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -143,7 +143,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; @@ -189,7 +188,6 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; -import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.test.EdgeManagerForTest; import org.apache.tez.test.VertexManagerPluginForTest; @@ -206,7 +204,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.internal.util.collections.Sets; @@ -2515,10 +2512,10 @@ public class TestVertexImpl { private void killVertex(VertexImpl v) { dispatcher.getEventHandler().handle( - new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL)); + new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); - Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL); + Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED); } private void startVertex(VertexImpl v, @@ -3322,7 +3319,7 @@ public class TestVertexImpl { StringUtils.join(v3.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH); assertTrue(diagnostics.contains( "vertex received kill while in running state")); - Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause()); + Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause()); assertTrue(diagnostics.contains(v3.getTerminationCause().name().toLowerCase(Locale.ENGLISH))); } @@ -3334,7 +3331,7 @@ public class TestVertexImpl { startVertex(v); dispatcher.getEventHandler().handle( - new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL)); + new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); @@ -3359,7 +3356,7 @@ public class TestVertexImpl { startVertex(v); dispatcher.getEventHandler().handle( - new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL)); + new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED)); dispatcher.await(); Assert.assertEquals(VertexState.KILLED, v.getState()); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java index 1f75afb..b3568eb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -52,15 +53,21 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent; +import org.apache.tez.dag.helpers.DagInfoImplForTest; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.ServicePluginException; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -244,6 +251,75 @@ public class TestContainerLauncherManager { @SuppressWarnings("unchecked") @Test(timeout = 5000) + public void testReportFailureFromContainerLauncher() throws ServicePluginException, TezException { + final String dagName = DAG_NAME; + final int dagIndex = DAG_INDEX; + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), dagIndex); + DAG dag = mock(DAG.class); + doReturn(dagName).when(dag).getName(); + doReturn(dagId).when(dag).getID(); + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn("testlauncher").when(appContext).getContainerLauncherName(0); + + NamedEntityDescriptor<TaskCommunicatorDescriptor> taskCommDescriptor = + new NamedEntityDescriptor<>("testlauncher", ContainerLauncherForTest.class.getName()); + List<NamedEntityDescriptor> list = new LinkedList<>(); + list.add(taskCommDescriptor); + ContainerLauncherManager containerLauncherManager = + new ContainerLauncherManager(appContext, mock(TaskCommunicatorManagerInterface.class), "", + list, false); + + try { + ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class); + Container container1 = mock(Container.class); + ContainerLauncherLaunchRequestEvent launchRequestEvent = + new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0); + + + containerLauncherManager.handle(launchRequestEvent); + + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + assertEquals(DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError")); + assertTrue( + event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); + assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]")); + + reset(eventHandler); + // stop container + + ContainerId containerId2 = mock(ContainerId.class); + NodeId nodeId2 = mock(NodeId.class); + ContainerLauncherStopRequestEvent stopRequestEvent = + new ContainerLauncherStopRequestEvent(containerId2, nodeId2, null, 0, 0, 0); + + argumentCaptor = ArgumentCaptor.forClass(Event.class); + + containerLauncherManager.handle(stopRequestEvent); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGEventTerminateDag); + DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent; + assertTrue(killEvent.getDiagnosticInfo().contains("ReportError")); + assertTrue(killEvent.getDiagnosticInfo() + .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())); + assertTrue(killEvent.getDiagnosticInfo().contains("[0:testlauncher]")); + } finally { + containerLauncherManager.stop(); + } + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testContainerLauncherUserError() throws ServicePluginException { ContainerLauncher containerLauncher = mock(ContainerLauncher.class); @@ -256,7 +332,8 @@ public class TestContainerLauncherManager { Configuration conf = new Configuration(false); ContainerLauncherManager containerLauncherManager = - new ContainerLauncherManager(containerLauncher, appContext); + new ContainerLauncherManager(appContext); + containerLauncherManager.setContainerLauncher(containerLauncher); try { containerLauncherManager.init(conf); containerLauncherManager.start(); @@ -437,4 +514,26 @@ public class TestContainerLauncherManager { } } + private static final String DAG_NAME = "dagName"; + private static final int DAG_INDEX = 1; + public static class ContainerLauncherForTest extends ContainerLauncher { + + public ContainerLauncherForTest( + ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) throws + ServicePluginException { + getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null); + } + + @Override + public void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException { + getContext() + .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME)); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index f69d8be..a3e5ff5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -49,6 +50,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.io.IOExceptionWithCause; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -74,6 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDr import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher; +import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; import org.junit.After; @@ -503,10 +506,14 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockApp).nodesUpdated(mockUpdatedNodes); - Exception mockException = mock(Exception.class); + ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); + Exception mockException = new IOException("mockexception"); scheduler.onError(mockException); drainableAppCallback.drain(); - verify(mockApp).onError(mockException); + verify(mockApp) + .reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(), + any(DagInfo.class)); + assertTrue(argumentCaptor.getValue().contains("mockexception")); scheduler.onShutdownRequest(); drainableAppCallback.drain(); @@ -1220,10 +1227,14 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockApp).nodesUpdated(mockUpdatedNodes); - Exception mockException = mock(Exception.class); + + ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); + Exception mockException = new IOException("mockexception"); scheduler.onError(mockException); drainableAppCallback.drain(); - verify(mockApp).onError(mockException); + verify(mockApp).reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(), + any(DagInfo.class)); + assertTrue(argumentCaptor.getValue().contains("mockexception")); scheduler.onShutdownRequest(); drainableAppCallback.drain(); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index b54d024..ab85751 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; @@ -68,6 +70,8 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -283,9 +287,10 @@ class TestTaskSchedulerHelpers { } @Override - public void onError(Throwable t) { + public void reportError(@Nonnull ServicePluginError servicePluginError, String message, + DagInfo dagInfo) { invocations++; - real.onError(t); + real.reportError(servicePluginError, message, dagInfo); } @Override @@ -327,6 +332,12 @@ class TestTaskSchedulerHelpers { return real.getApplicationAttemptId(); } + @Nullable + @Override + public DagInfo getCurrentDagInfo() { + return real.getCurrentDagInfo(); + } + @Override public String getAppHostName() { return real.getAppHostName(); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 4d828e2..791bb7f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -28,11 +28,13 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -71,9 +73,11 @@ import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; import org.apache.tez.dag.app.dag.impl.TaskImpl; import org.apache.tez.dag.app.dag.impl.VertexImpl; @@ -84,16 +88,19 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.AMContainerState; import org.apache.tez.dag.app.web.WebUIService; +import org.apache.tez.dag.helpers.DagInfoImplForTest; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -539,6 +546,81 @@ public class TestTaskSchedulerManager { @SuppressWarnings("unchecked") @Test(timeout = 5000) + public void testReportFailureFromTaskScheduler() { + String dagName = DAG_NAME; + Configuration conf = new TezConfiguration(); + String taskSchedulerName = "testTaskScheduler"; + String expIdentifier = "[0:" + taskSchedulerName + "]"; + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + doReturn(taskSchedulerName).when(appContext).getTaskSchedulerName(0); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(conf).when(appContext).getAMConf(); + InetSocketAddress address = new InetSocketAddress("host", 55000); + + DAGClientServer dagClientServer = mock(DAGClientServer.class); + doReturn(address).when(dagClientServer).getBindAddress(); + + DAG dag = mock(DAG.class); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX); + doReturn(dagName).when(dag).getName(); + doReturn(dagId).when(dag).getID(); + doReturn(dag).when(appContext).getCurrentDAG(); + + NamedEntityDescriptor<TaskSchedulerDescriptor> namedEntityDescriptor = + new NamedEntityDescriptor<>(taskSchedulerName, TaskSchedulerForFailureTest.class.getName()); + List<NamedEntityDescriptor> list = new LinkedList<>(); + list.add(namedEntityDescriptor); + + TaskSchedulerManager taskSchedulerManager = + new TaskSchedulerManager(appContext, dagClientServer, eventHandler, + mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false) { + @Override + TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) { + // Avoid wrapping in threads + return rawContext; + } + }; + try { + taskSchedulerManager.init(new TezConfiguration()); + taskSchedulerManager.start(); + + taskSchedulerManager.getTotalResources(0); + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGEventTerminateDag); + DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent; + assertTrue(killEvent.getDiagnosticInfo().contains("ReportError")); + assertTrue(killEvent.getDiagnosticInfo() + .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())); + assertTrue(killEvent.getDiagnosticInfo().contains(expIdentifier)); + + + reset(eventHandler); + taskSchedulerManager.getAvailableResources(0); + argumentCaptor = ArgumentCaptor.forClass(Event.class); + + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + rawEvent = argumentCaptor.getValue(); + + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError")); + assertTrue( + event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); + assertTrue(event.getDiagnosticInfo().contains(expIdentifier)); + + } finally { + taskSchedulerManager.stop(); + } + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testTaskSchedulerUserError() { TaskScheduler taskScheduler = mock(TaskScheduler.class, new ExceptionAnswer()); @@ -798,4 +880,83 @@ public class TestTaskSchedulerManager { return false; } } + + private static final String DAG_NAME = "dagName"; + private static final int DAG_INDEX = 1; + public static class TaskSchedulerForFailureTest extends TaskScheduler { + + public TaskSchedulerForFailureTest(TaskSchedulerContext taskSchedulerContext) { + super(taskSchedulerContext); + } + + @Override + public Resource getAvailableResources() throws ServicePluginException { + getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null); + return Resource.newInstance(1024, 1); + } + + @Override + public Resource getTotalResources() throws ServicePluginException { + getContext() + .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME)); + return Resource.newInstance(1024, 1); + } + + @Override + public int getClusterNodeCount() throws ServicePluginException { + return 0; + } + + @Override + public void blacklistNode(NodeId nodeId) throws ServicePluginException { + + } + + @Override + public void unblacklistNode(NodeId nodeId) throws ServicePluginException { + + } + + @Override + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, + Object clientCookie) throws + ServicePluginException { + + } + + @Override + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, + Object clientCookie) throws + ServicePluginException { + + } + + @Override + public boolean deallocateTask(Object task, boolean taskSucceeded, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) throws ServicePluginException { + return false; + } + + @Override + public Object deallocateContainer(ContainerId containerId) throws ServicePluginException { + return null; + } + + @Override + public void setShouldUnregister() throws ServicePluginException { + + } + + @Override + public boolean hasUnregistered() throws ServicePluginException { + return false; + } + + @Override + public void dagComplete() throws ServicePluginException { + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java new file mode 100644 index 0000000..f92513f --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.helpers; + +import org.apache.tez.serviceplugins.api.DagInfo; + +public class DagInfoImplForTest implements DagInfo { + + private final int index; + private final String name; + + public DagInfoImplForTest(int index, String name) { + this.index = index; + this.name = name; + } + + @Override + public int getIndex() { + return index; + } + + @Override + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java new file mode 100644 index 0000000..32d1fb6 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; + +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.serviceplugins.api.ServicePluginContextBase; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; + +public class ErrorPluginConfiguration { + + public static final String REPORT_FATAL_ERROR_MESSAGE = "ReportedFatalError"; + public static final String REPORT_NONFATAL_ERROR_MESSAGE = "ReportedError"; + public static final String THROW_ERROR_EXCEPTION_STRING = "Simulated Error"; + + private static final String CONF_THROW_ERROR = "throw.error"; + private static final String CONF_REPORT_ERROR = "report.error"; + private static final String CONF_REPORT_ERROR_FATAL = "report.error.fatal"; + private static final String CONF_REPORT_ERROR_DAG_NAME = "report.error.dag.name"; + + private final HashMap<String, String> kv; + + private ErrorPluginConfiguration() { + this.kv = new HashMap<>(); + } + + private ErrorPluginConfiguration(HashMap<String, String> map) { + this.kv = map; + } + + public static ErrorPluginConfiguration createThrowErrorConf() { + ErrorPluginConfiguration conf = new ErrorPluginConfiguration(); + conf.kv.put(CONF_THROW_ERROR, String.valueOf(true)); + return conf; + } + + public static ErrorPluginConfiguration createReportFatalErrorConf(String dagName) { + ErrorPluginConfiguration conf = new ErrorPluginConfiguration(); + conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true)); + conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(true)); + conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName); + return conf; + } + + public static ErrorPluginConfiguration createReportNonFatalErrorConf(String dagName) { + ErrorPluginConfiguration conf = new ErrorPluginConfiguration(); + conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true)); + conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(false)); + conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName); + return conf; + } + + public static UserPayload toUserPayload(ErrorPluginConfiguration conf) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(conf.kv); + oos.close(); + UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(baos.toByteArray())); + return userPayload; + } + + @SuppressWarnings("unchecked") + public static ErrorPluginConfiguration toErrorPluginConfiguration(UserPayload userPayload) throws + IOException, ClassNotFoundException { + + byte[] b = new byte[userPayload.getPayload().remaining()]; + userPayload.getPayload().get(b); + ByteArrayInputStream bais = new ByteArrayInputStream(b); + ObjectInputStream ois = new ObjectInputStream(bais); + + HashMap<String, String> map = (HashMap) ois.readObject(); + ErrorPluginConfiguration conf = new ErrorPluginConfiguration(map); + return conf; + } + + public boolean shouldThrowError() { + return (kv.containsKey(CONF_THROW_ERROR) && Boolean.parseBoolean(kv.get(CONF_THROW_ERROR))); + } + + public boolean shouldReportFatalError(String dagName) { + if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) && + Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL))) { + if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") || + kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) { + return true; + } + } + return false; + } + + public boolean shouldReportNonFatalError(String dagName) { + if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) && + Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL)) == false) { + if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") || + kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) { + return true; + } + } + return false; + } + + public static void processError(ErrorPluginConfiguration conf, ServicePluginContextBase context) { + if (conf.shouldThrowError()) { + throw new RuntimeException(ErrorPluginConfiguration.THROW_ERROR_EXCEPTION_STRING); + } else if (conf.shouldReportFatalError(null)) { + context.reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, + ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, + context.getCurrentDagInfo()); + } else if (conf.shouldReportNonFatalError(null)) { + context.reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, + ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE, + context.getCurrentDagInfo()); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java index d489cca..b4ea176 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java @@ -14,24 +14,33 @@ package org.apache.tez.dag.app.launcher; +import java.io.IOException; + +import org.apache.tez.dag.app.ErrorPluginConfiguration; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; public class TezTestServiceContainerLauncherWithErrors extends ContainerLauncher { + + private final ErrorPluginConfiguration conf; + public TezTestServiceContainerLauncherWithErrors( - ContainerLauncherContext containerLauncherContext) { + ContainerLauncherContext containerLauncherContext) throws IOException, + ClassNotFoundException { super(containerLauncherContext); + conf = ErrorPluginConfiguration.toErrorPluginConfiguration(containerLauncherContext.getInitialUserPayload()); } @Override public void launchContainer(ContainerLaunchRequest launchRequest) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void stopContainer(ContainerStopRequest stopRequest) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java index 1705eac..13d4815 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java @@ -16,18 +16,25 @@ package org.apache.tez.dag.app.rm; import javax.annotation.Nullable; +import java.io.IOException; + import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.app.ErrorPluginConfiguration; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler { + + private final ErrorPluginConfiguration conf; + public TezTestServiceTaskSchedulerServiceWithErrors( - TaskSchedulerContext taskSchedulerContext) { + TaskSchedulerContext taskSchedulerContext) throws IOException, ClassNotFoundException { super(taskSchedulerContext); + conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskSchedulerContext.getInitialUserPayload()); } @Override @@ -47,35 +54,37 @@ public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler @Override public void blacklistNode(NodeId nodeId) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void unblacklistNode(NodeId nodeId) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, Object containerSignature, Object clientCookie) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void allocateTask(Object task, Resource capability, ContainerId containerId, Priority priority, Object containerSignature, Object clientCookie) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, @Nullable String diagnostics) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); + return true; } @Override public Object deallocateContainer(ContainerId containerId) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); + return null; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java index 90313d4..8221957 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java @@ -15,6 +15,7 @@ package org.apache.tez.dag.app.taskcomm; import javax.annotation.Nullable; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; @@ -22,6 +23,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.app.ErrorPluginConfiguration; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.api.event.VertexStateUpdate; @@ -31,20 +34,24 @@ import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator { + + private final ErrorPluginConfiguration conf; + public TezTestServiceTaskCommunicatorWithErrors( - TaskCommunicatorContext taskCommunicatorContext) { + TaskCommunicatorContext taskCommunicatorContext) throws IOException, ClassNotFoundException { super(taskCommunicatorContext); + conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskCommunicatorContext.getInitialUserPayload()); } @Override public void registerRunningContainer(ContainerId containerId, String hostname, int port) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, @Nullable String diagnostics) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override @@ -52,14 +59,14 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator { Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, @Nullable String diagnostics) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override @@ -69,7 +76,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator { @Override public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); } @Override @@ -78,6 +85,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator { @Override public Object getMetaInfo() { - throw new RuntimeException("Simulated Error"); + ErrorPluginConfiguration.processError(conf, getContext()); + return null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java index bfd3ed2..ac6ebde 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java @@ -19,7 +19,9 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.EnumSet; +import java.util.List; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -40,6 +42,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.app.ErrorPluginConfiguration; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; @@ -49,6 +52,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors; import org.apache.tez.examples.JoinValidateConfigured; import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; @@ -63,7 +67,13 @@ public class TestExternalTezServicesErrors { private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class); private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; - private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail"; + private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors"; + private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors"; + private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors"; + + private static final String SUFFIX_LAUNCHER = "ContainerLauncher"; + private static final String SUFFIX_TASKCOMM = "TaskCommunicator"; + private static final String SUFFIX_SCHEDULER = "TaskScheduler"; private static ExternalTezServiceTestHelper extServiceTestHelper; @@ -76,12 +86,32 @@ public class TestExternalTezServicesErrors { private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create( EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); - private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL = - Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); - private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL = - Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME); - private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL = - Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + // Throw error contexts + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, + EXT_THROW_ERROR_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW = + Vertex.VertexExecutionContext.create(EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + + // Report-non-fatal contexts + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, + EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL = + Vertex.VertexExecutionContext.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + + // Report fatal contexts + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, + EXT_REPORT_FATAL_ERROR_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL = + Vertex.VertexExecutionContext.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH; @@ -93,29 +123,63 @@ public class TestExternalTezServicesErrors { public static void setup() throws Exception { extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs()); + UserPayload userPayload = + TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs()); + UserPayload userPayloadThrowError = + ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf()); + + UserPayload userPayloadReportFatalErrorLauncher = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER)); + UserPayload userPayloadReportFatalErrorTaskComm = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM)); + UserPayload userPayloadReportFatalErrorScheduler = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER)); + + UserPayload userPayloadReportNonFatalErrorLauncher = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER)); + UserPayload userPayloadReportNonFatalErrorTaskComm = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM)); + UserPayload userPayloadReportNonFatalErrorScheduler = ErrorPluginConfiguration + .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER)); TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ TaskSchedulerDescriptor .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()) .setUserPayload(userPayload), - TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME, + TaskSchedulerDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, + TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload( + userPayloadThrowError), + TaskSchedulerDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, + TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload( + userPayloadReportFatalErrorScheduler), + TaskSchedulerDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload( - userPayload)}; + userPayloadReportNonFatalErrorScheduler), + }; ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ ContainerLauncherDescriptor .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()) .setUserPayload(userPayload), - ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME, - TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload)}; + ContainerLauncherDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, + TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadThrowError), + ContainerLauncherDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, + TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorLauncher), + ContainerLauncherDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, + TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorLauncher) + }; TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ TaskCommunicatorDescriptor .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()) .setUserPayload(userPayload), - TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME, - TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload)}; + TaskCommunicatorDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, + TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadThrowError), + TaskCommunicatorDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, + TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorTaskComm), + TaskCommunicatorDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, + TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorTaskComm) + }; servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); @@ -137,35 +201,86 @@ public class TestExternalTezServicesErrors { extServiceTestHelper.tearDownAll(); } - @Test (timeout = 90000) - public void testContainerLauncherError() throws Exception { - testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL, - DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR); + @Test(timeout = 90000) + public void testContainerLauncherThrowError() throws Exception { + testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW, + SUFFIX_LAUNCHER, Lists.newArrayList("Service Error", + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name())); + } + + @Test(timeout = 90000) + public void testTaskCommunicatorThrowError() throws Exception { + testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW, + SUFFIX_TASKCOMM, Lists.newArrayList("Service Error", + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name())); + } + + @Test(timeout = 90000) + public void testTaskSchedulerThrowError() throws Exception { + testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW, + SUFFIX_SCHEDULER, Lists.newArrayList("Service Error", + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name())); + } + + @Test (timeout = 150000) + public void testNonFatalErrors() throws IOException, TezException, InterruptedException { + String methodName = "testNonFatalErrors"; + TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs()); + TezClient tezClient = TezClient + .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session", + tezClientConf) + .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); + try { + tezClient.start(); + LOG.info("TezSessionStarted for " + methodName); + tezClient.waitTillReady(); + LOG.info("TezSession ready for submission for " + methodName); + + + runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL); + runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL); + runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL); + + } finally { + tezClient.stop(); + } + } + + @Test(timeout = 90000) + public void testContainerLauncherReportFatalError() throws Exception { + testFatalError("_testContainerLauncherReportFatalError_", + EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists + .newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, + ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); } - @Test (timeout = 90000) - public void testTaskCommunicatorError() throws Exception { - testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL, - DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR); + @Test(timeout = 90000) + public void testTaskCommReportFatalError() throws Exception { + testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL, + SUFFIX_TASKCOMM, Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, + ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); } - @Test (timeout = 90000) - public void testTaskSchedulerError() throws Exception { - testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL, - DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR); + @Test(timeout = 90000) + public void testTaskSchedulerReportFatalError() throws Exception { + testFatalError("_testTaskSchedulerReportFatalError_", + EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER, + Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, + ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); } - private void testServiceError(String methodName, - Vertex.VertexExecutionContext lhsExecutionContext, - DAGAppMasterEventType expectedEventType) throws - IOException, TezException, InterruptedException, YarnException { + + private void testFatalError(String methodName, + Vertex.VertexExecutionContext lhsExecutionContext, + String dagNameSuffix, List<String> expectedDiagMessages) throws + IOException, TezException, YarnException, InterruptedException { TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs()); TezClient tezClient = TezClient .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session", tezClientConf) .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); - ApplicationId appId; + ApplicationId appId= null; try { tezClient.start(); LOG.info("TezSessionStarted for " + methodName); @@ -175,10 +290,11 @@ public class TestExternalTezServicesErrors { JoinValidateConfigured joinValidate = new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, - EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest"); + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, dagNameSuffix); DAG dag = joinValidate - .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, + .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), + HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3); DAGClient dagClient = tezClient.submitDAG(dag); @@ -188,14 +304,15 @@ public class TestExternalTezServicesErrors { assertEquals(DAGStatus.State.ERROR, dagStatus.getState()); boolean foundDiag = false; for (String diag : dagStatus.getDiagnostics()) { - if (diag.contains("Service Error") && diag.contains( - expectedEventType.toString()) && - diag.contains("Simulated Error")) { - foundDiag = true; + foundDiag = checkDiag(diag, expectedDiagMessages); + if (foundDiag) { + break; } } appId = tezClient.getAppMasterApplicationId(); assertTrue(foundDiag); + } catch (InterruptedException e) { + e.printStackTrace(); } finally { tezClient.stop(); } @@ -222,14 +339,58 @@ public class TestExternalTezServicesErrors { String diag = appAttemptReport.getDiagnostics(); assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus()); assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState()); - assertTrue(diag.contains("Service Error") && diag.contains( - expectedEventType.toString()) && - diag.contains("Simulated Error")); - + checkDiag(diag, expectedDiagMessages); } finally { yarnClient.stop(); } } } + private boolean checkDiag(String diag, List<String> expected) { + boolean found = true; + for (String exp : expected) { + if (diag.contains(exp)) { + found = true; + continue; + } else { + found = false; + break; + } + } + return found; + } + + + private void runAndVerifyForNonFatalErrors(TezClient tezClient, String componentName, + Vertex.VertexExecutionContext lhsContext) throws + TezException, + InterruptedException, IOException { + LOG.info("Running JoinValidate with componentName reportNonFatalException"); + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, componentName); + + DAG dag = joinValidate + .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), + HASH_JOIN_EXPECTED_RESULT_PATH, + HASH_JOIN_OUTPUT_PATH, 3); + + DAGClient dagClient = tezClient.submitDAG(dag); + + DAGStatus dagStatus = + dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + assertEquals(DAGStatus.State.FAILED, dagStatus.getState()); + + boolean foundDiag = false; + for (String diag : dagStatus.getDiagnostics()) { + if (diag.contains(ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE) && + diag.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) { + foundDiag = true; + break; + } + } + assertTrue(foundDiag); + } + }
