Repository: tez Updated Branches: refs/heads/master d91eb2835 -> fe11c5e67
TEZ-2261. Should add diagnostics in DAGAppMaster when recovery error happens (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe11c5e6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe11c5e6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe11c5e6 Branch: refs/heads/master Commit: fe11c5e67334f1a08e2b586b68ee4840d8b62763 Parents: d91eb28 Author: Jeff Zhang <[email protected]> Authored: Thu Apr 23 09:46:54 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Thu Apr 23 09:46:54 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 35 +++++++++++++++----- .../apache/tez/dag/app/MockDAGAppMaster.java | 20 ++++++++++- .../tez/dag/app/TestMockDAGAppMaster.java | 28 ++++++++++++++++ 4 files changed, 75 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c32830e..af39092 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2261. Should add diagnostics in DAGAppMaster when recovery error happens TEZ-2340. TestRecoveryParser fails TEZ-2345. Tez UI: Enable cell level loading in all DAGs table TEZ-2330. Create reconfigureVertex() API for input based initialization http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 3dd9e4c..8a914f6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -236,6 +236,7 @@ public class DAGAppMaster extends AbstractService { private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>(); private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>(); private final int maxAppAttempts; + private final List<String> diagnostics = new ArrayList<String>(); private boolean isLocal = false; //Local mode flag @@ -383,6 +384,7 @@ public class DAGAppMaster extends AbstractService { versionMismatchDiagnostics = "Incompatible versions found" + ", clientVersion=" + clientVersion + ", AMVersion=" + dagVersionInfo.getVersion(); + addDiagnostic(versionMismatchDiagnostics); if (disableVersionCheck) { LOG.warn("Ignoring client-AM version mismatch as check disabled. " + versionMismatchDiagnostics); @@ -491,7 +493,7 @@ public class DAGAppMaster extends AbstractService { addIfService(containerLauncher, true); dispatcher.register(NMCommunicatorEventType.class, containerLauncher); - historyEventHandler = new HistoryEventHandler(context); + historyEventHandler = createHistoryEventHandler(context); addIfService(historyEventHandler, true); this.sessionTimeoutInterval = 1000 * amConf.getInt( @@ -563,6 +565,11 @@ public class DAGAppMaster extends AbstractService { return new AsyncDispatcher("Central"); } + @VisibleForTesting + protected HistoryEventHandler createHistoryEventHandler(AppContext appContext) { + return new HistoryEventHandler(appContext); + } + /** * Exit call. Just in a function call to enable testing. */ @@ -624,8 +631,10 @@ public class DAGAppMaster extends AbstractService { lastDAGCompletionTime = clock.getTime(); _updateLoggers(currentDAG, "_post"); if (this.historyEventHandler.hasRecoveryFailed()) { - LOG.warn("Recovery had a fatal error, shutting down session after" + - " DAG completion"); + String recoveryErrorMsg = "Recovery had a fatal error, shutting down session after" + + " DAG completion"; + LOG.warn(recoveryErrorMsg); + addDiagnostic(recoveryErrorMsg); sessionStopped.set(true); } switch(finishEvt.getDAGState()) { @@ -1071,22 +1080,32 @@ public class DAGAppMaster extends AbstractService { return state; } + private void addDiagnostic(String diag) { + synchronized (diagnostics) { + diagnostics.add(diag); + } + } + public List<String> getDiagnostics() { - if (versionMismatch) { - return Collections.singletonList(versionMismatchDiagnostics); + // always create new diagnostics to return + // This is to avoid the case that this method is called multiple times and diagnostics is accumulated. + List<String> diagResult = new ArrayList<String>(); + synchronized (diagnostics) { + diagResult.addAll(this.diagnostics); } + if (!isSession) { if(currentDAG != null) { - return currentDAG.getDiagnostics(); + diagResult.addAll(currentDAG.getDiagnostics()); } } else { - return Collections.singletonList("Session stats:" + diagResult.add("Session stats:" + "submittedDAGs=" + submittedDAGs.get() + ", successfulDAGs=" + successfulDAGs.get() + ", failedDAGs=" + failedDAGs.get() + ", killedDAGs=" + killedDAGs.get()); } - return null; + return diagResult; } public float getProgress() { http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 92dfdb5..18286b5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -59,6 +59,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; import org.apache.tez.dag.app.rm.container.AMContainerEvent; import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; @@ -92,6 +93,7 @@ public class MockDAGAppMaster extends DAGAppMaster { boolean initFailFlag; boolean startFailFlag; boolean sendDMEvents; + boolean recoveryFatalError = false; CountersDelegate countersDelegate; StatisticsDelegate statsDelegate; long launcherSleepTime = 1; @@ -453,6 +455,17 @@ public class MockDAGAppMaster extends DAGAppMaster { } } + public class MockHistoryEventHandler extends HistoryEventHandler { + + public MockHistoryEventHandler(AppContext context) { + super(context); + } + + @Override + public boolean hasRecoveryFailed() { + return recoveryFatalError; + } + } public class MockDAGAppMasterShutdownHandler extends DAGAppMasterShutdownHandler { public AtomicInteger shutdownInvoked = new AtomicInteger(0); @@ -498,7 +511,12 @@ public class MockDAGAppMaster extends DAGAppMaster { throws UnknownHostException { return containerLauncher; } - + + @Override + protected HistoryEventHandler createHistoryEventHandler(AppContext appContext) { + return new MockHistoryEventHandler(appContext); + } + public MockContainerLauncher getContainerLauncher() { return containerLauncher; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index db1d632..33dd18d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -887,4 +887,32 @@ public class TestMockDAGAppMaster { } } } + + @Test(timeout = 5000) + public void testDAGFinishedRecoveryError() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); + tezClient.start(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + mockApp.recoveryFatalError = true; + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(true); + + DAG dag = DAG.create("test"); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); + dag.addVertex(vA); + + DAGClient dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + while(!mockApp.getShutdownHandler().wasShutdownInvoked()) { + Thread.sleep(100); + } + Assert.assertEquals(DAGState.SUCCEEDED, mockApp.getContext().getCurrentDAG().getState()); + Assert.assertEquals(DAGAppMasterState.FAILED, mockApp.getState()); + Assert.assertTrue(StringUtils.join(mockApp.getDiagnostics(),",") + .contains("Recovery had a fatal error, shutting down session after" + + " DAG completion")); + } }
