http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index be67cb2..9a45859 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -19,8 +19,11 @@ package org.apache.tez.dag.app; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Random; @@ -29,11 +32,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.RecoveryParser.DAGSummaryData; -import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData; +import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData; +import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData; +import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData; import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.app.dag.impl.TestDAGImpl; import org.apache.tez.dag.history.DAGHistoryEvent; @@ -42,10 +55,27 @@ import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.*; +import com.google.common.collect.Lists; + import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -75,6 +105,7 @@ public class TestRecoveryParser { mockDAGImpl = mock(DAGImpl.class); when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl); parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3); + LogManager.getRootLogger().setLevel(Level.DEBUG); } private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) { @@ -155,14 +186,14 @@ public class TestRecoveryParser { new DAGStartedEvent(dagID, 1L, "user", "dag1"))); rService.stop(); - RecoveredDAGData dagData = parser.parseRecoveryData(); + DAGRecoveryData dagData = parser.parseRecoveryData(); assertEquals(true, dagData.nonRecoverable); assertTrue(dagData.reason.contains("DAG Commit was in progress, not recoverable,")); // DAGSubmittedEvent is handled but DAGInitializedEvent and DAGStartedEvent in the next attempt are both skipped // due to the dag is not recoerable. verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); - verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGInitializedEvent.class)); - verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class)); + assertNull(dagData.getDAGInitializedEvent()); + assertNull(dagData.getDAGStartedEvent()); } // skipAllOtherEvents due to dag finished @@ -202,7 +233,7 @@ public class TestRecoveryParser { new DAGStartedEvent(dagID, 1L, "user", "dag1"))); rService.stop(); - RecoveredDAGData dagData = parser.parseRecoveryData(); + DAGRecoveryData dagData = parser.parseRecoveryData(); assertEquals(false, dagData.nonRecoverable); assertEquals(DAGState.FAILED, dagData.dagState); assertEquals(true, dagData.isCompleted); @@ -210,9 +241,8 @@ public class TestRecoveryParser { verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); // DAGInitializedEvent may not been handled before DAGFinishedEvent, // because DAGFinishedEvent's writeToRecoveryImmediately is true - verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGFinishedEvent.class)); - // DAGStartedEvent is skipped due to it is after DAGFinishedEvent - verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class)); + assertNotNull(dagData.getDAGFinishedEvent()); + assertNull(dagData.getDAGStartedEvent()); } @Test(timeout = 5000) @@ -250,13 +280,13 @@ public class TestRecoveryParser { rService.stop(); // corrupted last records will be skipped but the whole recovery logs will be read - RecoveredDAGData dagData = parser.parseRecoveryData(); + DAGRecoveryData dagData = parser.parseRecoveryData(); assertEquals(false, dagData.isCompleted); assertEquals(null, dagData.reason); assertEquals(false, dagData.nonRecoverable); // verify DAGSubmitedEvent & DAGInititlizedEvent is handled. verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); - verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class)); + assertNotNull(dagData.getDAGInitializedEvent()); } @Test(timeout = 5000) @@ -293,4 +323,434 @@ public class TestRecoveryParser { } } + @Test(timeout=5000) + public void testRecoverableSummary_DAGInCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + rService.handle(new DAGHistoryEvent(dagID, + new DAGCommitStartedEvent(dagID, 0L))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertTrue(dagData.nonRecoverable); + assertTrue(dagData.reason.contains("DAG Commit was in progress")); + } + + @Test(timeout=5000) + public void testRecoverableSummary_DAGFinishCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + rService.handle(new DAGHistoryEvent(dagID, + new DAGCommitStartedEvent(dagID, 0L))); + rService.handle(new DAGHistoryEvent(dagID, + new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null, + appAttemptId))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertEquals(DAGState.FAILED, dagData.dagState); + assertFalse(dagData.nonRecoverable); + assertNull(dagData.reason); + assertTrue(dagData.isCompleted); + } + + @Test(timeout=5000) + public void testRecoverableSummary_VertexInCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + rService.handle(new DAGHistoryEvent(dagID, + new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertTrue(dagData.nonRecoverable); + assertTrue(dagData.reason.contains("Vertex Commit was in progress")); + } + + @Test(timeout=5000) + public void testRecoverableSummary_VertexFinishCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); + rService.handle(new DAGHistoryEvent(dagID, + new VertexCommitStartedEvent(vertexId, 0L))); + rService.handle(new DAGHistoryEvent(dagID, + new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, + 0L, 0L, 0L, VertexState.SUCCEEDED, + "", null, null, null))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertFalse(dagData.nonRecoverable); + } + + @Test(timeout=5000) + public void testRecoverableSummary_VertexGroupInCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + rService.handle(new DAGHistoryEvent(dagID, + new VertexGroupCommitStartedEvent(dagID, "group_1", + Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertTrue(dagData.nonRecoverable); + assertTrue(dagData.reason.contains("Vertex Group Commit was in progress")); + } + + @Test(timeout=5000) + public void testRecoverableSummary_VertexGroupFinishCommitting() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + TezVertexID v0 = TezVertexID.getInstance(dagID, 0); + TezVertexID v1 = TezVertexID.getInstance(dagID, 1); + rService.handle(new DAGHistoryEvent(dagID, + new VertexGroupCommitStartedEvent(dagID, "group_1", + Lists.newArrayList(v0, v1), 0L))); + rService.handle(new DAGHistoryEvent(dagID, + new VertexGroupCommitFinishedEvent(dagID, "group_1", + Lists.newArrayList(v0, v1), 0L))); + // also write VertexFinishedEvent, otherwise it is still non-recoverable + // when checking with non-summary event + rService.handle(new DAGHistoryEvent(dagID, + new VertexFinishedEvent(v0, "v1", 10, 0L, 0L, + 0L, 0L, 0L, VertexState.SUCCEEDED, + "", null, null, null))); + rService.handle(new DAGHistoryEvent(dagID, + new VertexFinishedEvent(v1, "v1", 10, 0L, 0L, + 0L, 0L, 0L, VertexState.SUCCEEDED, + "", null, null, null))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertEquals(dagID, dagData.recoveredDagID); + assertFalse(dagData.nonRecoverable); + } + + @Test(timeout=5000) + public void testRecoverableNonSummary1() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + // MockRecoveryService will skip the non-summary event + MockRecoveryService rService = new MockRecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); + rService.handle(new DAGHistoryEvent(dagID, + new VertexCommitStartedEvent(vertexId, 0L))); + rService.handle(new DAGHistoryEvent(dagID, + new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, + 0L, 0L, 0L, VertexState.SUCCEEDED, + "", null, null, null))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertTrue(dagData.nonRecoverable); + assertTrue(dagData.reason.contains("Vertex has been committed, but its full recovery events are not seen")); + } + + @Test(timeout=5000) + public void testRecoverableNonSummary2() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + // MockRecoveryService will skip the non-summary event + MockRecoveryService rService = new MockRecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + // It should be fine to skip other events, just for testing. + TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); + rService.handle(new DAGHistoryEvent(dagID, + new VertexGroupCommitStartedEvent(dagID, "group_1", + Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L))); + rService.handle(new DAGHistoryEvent(dagID, + new VertexGroupCommitFinishedEvent(dagID, "group_1", + Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L))); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertTrue(dagData.nonRecoverable); + assertTrue(dagData.reason.contains("Vertex has been committed as member of vertex group" + + ", but its full recovery events are not seen")); + } + + @Test(timeout=5000) + public void testRecoveryData() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null))); + DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, + "user", "dagName", null); + DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName"); + rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent)); + rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent)); + + // 3 vertices of this dag: v0, v1, v2 + TezVertexID v0Id = TezVertexID.getInstance(dagID, 0); + TezVertexID v1Id = TezVertexID.getInstance(dagID, 1); + TezVertexID v2Id = TezVertexID.getInstance(dagID, 2); + // v0 VertexInitializedEvent + VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent(v0Id, "v0", 200L, 400L, 2, null, null, null); + rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent)); + // v1 VertexFinishedEvent(KILLED) + VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1", 2, 300L, 400L, + 500L, 600L, 700L, VertexState.KILLED, + "", null, null, null); + rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent)); + // v2 VertexInitializedEvent -> VertexStartedEvent + List<TezEvent> initGeneratedEvents = Lists.newArrayList( + new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null)); + VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "v2", 200L, 300L, + 2, null, null, initGeneratedEvents); + VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L); + rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent)); + rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent)); + + // 3 tasks of v2 + TezTaskID t0v2Id = TezTaskID.getInstance(v2Id, 0); + TezTaskID t1v2Id = TezTaskID.getInstance(v2Id, 1); + TezTaskID t2v2Id = TezTaskID.getInstance(v2Id, 2); + // t0v2 TaskStartedEvent + TaskStartedEvent t0v2StartedEvent = new TaskStartedEvent(t0v2Id, "v2", 400L, 5000L); + rService.handle(new DAGHistoryEvent(dagID, t0v2StartedEvent)); + // t1v2 TaskFinishedEvent + TaskFinishedEvent t1v2FinishedEvent = new TaskFinishedEvent(t1v2Id, "v1", + 0L, 0L, null, TaskState.KILLED, "", null, 4); + rService.handle(new DAGHistoryEvent(dagID, t1v2FinishedEvent)); + // t2v2 TaskStartedEvent -> TaskFinishedEvent + TaskStartedEvent t2v2StartedEvent = new TaskStartedEvent(t2v2Id, "v2", 400L, 500L); + rService.handle(new DAGHistoryEvent(dagID, t2v2StartedEvent)); + TaskFinishedEvent t2v2FinishedEvent = new TaskFinishedEvent(t2v2Id, "v1", + 0L, 0L, null, TaskState.SUCCEEDED, "", null, 4); + rService.handle(new DAGHistoryEvent(dagID, t2v2FinishedEvent)); + + // attempts under t0v2 + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + NodeId nodeId = NodeId.newInstance("localhost", 9999); + TezTaskAttemptID ta0t0v2Id = TezTaskAttemptID.getInstance(t0v2Id, 0); + TaskAttemptStartedEvent ta0t0v2StartedEvent = new TaskAttemptStartedEvent( + ta0t0v2Id, "v1", 0L, containerId, + nodeId, "", "", ""); + rService.handle(new DAGHistoryEvent(dagID, ta0t0v2StartedEvent)); + // attempts under t2v2 + TezTaskAttemptID ta0t2v2Id = TezTaskAttemptID.getInstance(t2v2Id, 0); + TaskAttemptStartedEvent ta0t2v2StartedEvent = new TaskAttemptStartedEvent( + ta0t2v2Id, "v1", 500L, containerId, + nodeId, "", "", ""); + rService.handle(new DAGHistoryEvent(dagID, ta0t2v2StartedEvent)); + TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent( + ta0t2v2Id, "v1", 500L, 600L, + TaskAttemptState.SUCCEEDED, null, "", null, + null, null, 0L, null, 0L); + rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent)); + + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + assertFalse(dagData.nonRecoverable); + // There's no equals method for the history event, so here only verify the init/start/finish time of each event for simplicity + assertEquals(dagInitedEvent.getInitTime(), dagData.getDAGInitializedEvent().getInitTime()); + assertEquals(dagStartedEvent.getStartTime(), dagData.getDAGStartedEvent().getStartTime()); + assertNull(dagData.getDAGFinishedEvent()); + + VertexRecoveryData v0Data = dagData.getVertexRecoveryData(v0Id); + VertexRecoveryData v1Data = dagData.getVertexRecoveryData(v1Id); + VertexRecoveryData v2Data = dagData.getVertexRecoveryData(v2Id); + assertNotNull(v0Data); + assertNotNull(v1Data); + assertNotNull(v2Data); + assertEquals(v0InitedEvent.getInitedTime(), v0Data.getVertexInitedEvent().getInitedTime()); + assertNull(v0Data.getVertexStartedEvent()); + assertNull(v1Data.getVertexInitedEvent()); + assertEquals(v1FinishedEvent.getFinishTime(), v1Data.getVertexFinishedEvent().getFinishTime()); + assertEquals(v2InitedEvent.getInitedTime(), v2Data.getVertexInitedEvent().getInitedTime()); + assertEquals(v2StartedEvent.getStartTime(), v2Data.getVertexStartedEvent().getStartTime()); + + TaskRecoveryData t0v2Data = dagData.getTaskRecoveryData(t0v2Id); + TaskRecoveryData t1v2Data = dagData.getTaskRecoveryData(t1v2Id); + TaskRecoveryData t2v2Data = dagData.getTaskRecoveryData(t2v2Id); + assertNotNull(t0v2Data); + assertNotNull(t1v2Data); + assertNotNull(t2v2Data); + assertEquals(t0v2StartedEvent.getStartTime(), t0v2Data.getTaskStartedEvent().getStartTime()); + assertNull(t0v2Data.getTaskFinishedEvent()); + assertEquals(t1v2FinishedEvent.getFinishTime(), t1v2Data.getTaskFinishedEvent().getFinishTime()); + assertNull(t1v2Data.getTaskStartedEvent()); + assertEquals(t2v2StartedEvent.getStartTime(), t2v2Data.getTaskStartedEvent().getStartTime()); + assertEquals(t2v2FinishedEvent.getFinishTime(), t2v2Data.getTaskFinishedEvent().getFinishTime()); + + TaskAttemptRecoveryData ta0t0v2Data = dagData.getTaskAttemptRecoveryData(ta0t0v2Id); + TaskAttemptRecoveryData ta0t2v2Data = dagData.getTaskAttemptRecoveryData(ta0t2v2Id); + assertNotNull(ta0t0v2Data); + assertNotNull(ta0t2v2Data); + assertEquals(ta0t0v2StartedEvent.getStartTime(), ta0t0v2Data.getTaskAttemptStartedEvent().getStartTime()); + assertNull(ta0t0v2Data.getTaskAttemptFinishedEvent()); + assertEquals(ta0t2v2StartedEvent.getStartTime(), ta0t2v2Data.getTaskAttemptStartedEvent().getStartTime()); + assertEquals(ta0t2v2FinishedEvent.getFinishTime(), ta0t2v2Data.getTaskAttemptFinishedEvent().getFinishTime()); + } + + // Simulate the behavior that summary event is written + // but non-summary is not written to hdfs + public static class MockRecoveryService extends RecoveryService{ + + public MockRecoveryService(AppContext appContext) { + super(appContext); + } + + @Override + protected void handleRecoveryEvent(DAGHistoryEvent event) + throws IOException { + // skip the non-summary events + } + } }
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 17fa4d9..3f80928 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Random; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -68,7 +69,9 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; @@ -83,6 +86,8 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; @@ -235,15 +240,18 @@ public class TestTaskCommunicatorManager1 { @Test (timeout = 5000) public void testTaskEventRouting() throws Exception { List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null), - new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null), - new TezEvent(new TaskAttemptCompletedEvent(), null) + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), new EventMetaData(EventProducerConsumerType.PROCESSOR, + "v1", "v2", taskAttemptID)), + new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.OUTPUT, + "v1", "v2", taskAttemptID)), + new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM, + "v1", "v2", taskAttemptID)) ); generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(2)).handle(arg.capture()); + verify(eventHandler, times(4)).handle(arg.capture()); final List<Event> argAllValues = arg.getAllValues(); final Event statusUpdateEvent = argAllValues.get(0); @@ -251,28 +259,33 @@ public class TestTaskCommunicatorManager1 { statusUpdateEvent.getType()); assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); - final Event vertexEvent = argAllValues.get(1); - final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; - assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, - vertexEvent.getType()); + final TaskAttemptEventTezEventUpdate taEvent = (TaskAttemptEventTezEventUpdate)argAllValues.get(1); + assertEquals(1, taEvent.getTezEvents().size()); + assertEquals(EventType.DATA_MOVEMENT_EVENT, + taEvent.getTezEvents().get(0).getEventType()); + + final TaskAttemptEvent taCompleteEvent = (TaskAttemptEvent)argAllValues.get(2); + assertEquals(TaskAttemptEventType.TA_DONE, taCompleteEvent.getType()); + final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)argAllValues.get(3); + assertEquals(1, vertexRouteEvent.getEvents().size()); assertEquals(EventType.DATA_MOVEMENT_EVENT, vertexRouteEvent.getEvents().get(0).getEventType()); - assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, - vertexRouteEvent.getEvents().get(1).getEventType()); } @Test (timeout = 5000) public void testTaskEventRoutingWithReadError() throws Exception { List<TezEvent> events = Arrays.asList( new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null), - new TezEvent(InputReadErrorEvent.create("", 0, 0), null), - new TezEvent(new TaskAttemptCompletedEvent(), null) + new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventProducerConsumerType.INPUT, + "v2", "v1", taskAttemptID)), + new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM, + "v1", "v2", taskAttemptID)) ); generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(2)).handle(arg.capture()); + verify(eventHandler, times(3)).handle(arg.capture()); final List<Event> argAllValues = arg.getAllValues(); final Event statusUpdateEvent = argAllValues.get(0); @@ -280,22 +293,24 @@ public class TestTaskCommunicatorManager1 { statusUpdateEvent.getType()); assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); - final Event vertexEvent = argAllValues.get(1); + final Event taFinishedEvent = argAllValues.get(1); + assertEquals("Second event should be TA_DONE", TaskAttemptEventType.TA_DONE, + taFinishedEvent.getType()); + + final Event vertexEvent = argAllValues.get(2); final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; - assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + assertEquals("Third event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, vertexEvent.getType()); assertEquals(EventType.INPUT_READ_ERROR_EVENT, vertexRouteEvent.getEvents().get(0).getEventType()); - assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, - vertexRouteEvent.getEvents().get(1).getEventType()); - } @Test (timeout = 5000) public void testTaskEventRoutingTaskAttemptOnly() throws Exception { List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskAttemptCompletedEvent(), null) + new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM, + "v1", "v2", taskAttemptID)) ); generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); @@ -304,7 +319,8 @@ public class TestTaskCommunicatorManager1 { final List<Event> argAllValues = arg.getAllValues(); final Event event = argAllValues.get(0); - assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT, + // Route to TaskAttempt directly rather than through Vertex + assertEquals("only event should be route event", TaskAttemptEventType.TA_DONE, event.getType()); }
