Repository: tez Updated Branches: refs/heads/branch-0.7 cbd4eacb0 -> ae24f9905
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ae24f990 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ae24f990 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ae24f990 Branch: refs/heads/branch-0.7 Commit: ae24f9905d67539b9ce1a0b939a795a962ecb3d7 Parents: cbd4eac Author: Jason Lowe <[email protected]> Authored: Tue Sep 6 22:46:00 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Tue Sep 6 22:46:00 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 32 ++++++-- .../apache/tez/dag/app/TestDAGAppMaster.java | 82 ++++++++++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 65da496..4dc1c93 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 673f2fe..98e9355 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -211,6 +211,10 @@ public class DAGAppMaster extends AbstractService { private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); + @VisibleForTesting + static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. " + + "Application cannot recover and continue properly as DAG recovery has been disabled"; + private Clock clock; private final boolean isSession; private long appsStartTime; @@ -328,7 +332,7 @@ public class DAGAppMaster extends AbstractService { this.workingDirectory = workingDirectory; this.localDirs = localDirs; this.logDirs = logDirs; - this.shutdownHandler = new DAGAppMasterShutdownHandler(); + this.shutdownHandler = createShutdownHandler(); this.dagVersionInfo = new TezDagVersionInfo(); this.clientVersion = clientVersion; this.maxAppAttempts = maxAppAttempts; @@ -510,8 +514,7 @@ public class DAGAppMaster extends AbstractService { } } - this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, - clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService); + this.taskSchedulerEventHandler = createTaskSchedulerManager(); addIfService(taskSchedulerEventHandler, true); if (enableWebUIService()) { @@ -596,6 +599,17 @@ public class DAGAppMaster extends AbstractService { } @VisibleForTesting + protected DAGAppMasterShutdownHandler createShutdownHandler() { + return new DAGAppMasterShutdownHandler(); + } + + @VisibleForTesting + protected TaskSchedulerEventHandler createTaskSchedulerManager() { + return new TaskSchedulerEventHandler(context, clientRpcServer, + dispatcher.getEventHandler(), containerSignatureMatcher, webUIService); + } + + @VisibleForTesting protected ContainerSignatureMatcher createContainerSignatureMatcher() { return new ContainerContextMatcher(); } @@ -1808,8 +1822,16 @@ public class DAGAppMaster extends AbstractService { startServices(); super.serviceStart(); - if (versionMismatch) { - // Short-circuit and return as no DAG should not be run + boolean invalidSession = false; + if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() > 1) { + String err = INVALID_SESSION_ERR_MSG; + LOG.error(err); + addDiagnostic(err); + this.state = DAGAppMasterState.ERROR; + invalidSession = true; + } + if (versionMismatch || invalidSession) { + // Short-circuit and return as no DAG should be run this.taskSchedulerEventHandler.setShouldUnregisterFlag(); shutdownHandler.shutdown(); return; http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 2390c79..8ee477e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -17,13 +17,20 @@ package org.apache.tez.dag.app; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -48,7 +55,9 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; import org.apache.tez.dag.records.TezDAGID; import org.junit.After; import org.junit.Before; @@ -72,6 +81,32 @@ public class TestDAGAppMaster { FileUtil.fullyDelete(TEST_DIR); } + @Test(timeout = 20000) + public void testInvalidSession() throws Exception { + // AM should fail if not the first attempt and in session mode and + // DAG recovery is disabled, otherwise the app can succeed without + // finishing an in-progress DAG. + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3); + TezConfiguration conf = new TezConfiguration(false); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + dam.init(conf); + dam.start(); + verify(dam.mockScheduler).setShouldUnregisterFlag(); + verify(dam.mockShutdown).shutdown(); + List<String> diags = dam.getDiagnostics(); + boolean found = false; + for (String diag : diags) { + if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) { + found = true; + break; + } + } + assertTrue("Missing invalid session diagnostics", found); + dam.stop(); + } + @Test public void testDagCredentialsWithoutMerge() throws Exception { testDagCredentials(false); @@ -233,4 +268,51 @@ public class TestDAGAppMaster { return new TestTokenIdentifier(); } } + + private static class DAGAppMasterForTest extends DAGAppMaster { + private DAGAppMasterShutdownHandler mockShutdown; + private TaskSchedulerEventHandler mockScheduler = mock(TaskSchedulerEventHandler.class); + + public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts) { + super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, + new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), + new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, + new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname"); + } + + private static Credentials createCredentials() { + Credentials creds = new Credentials(); + JobTokenSecretManager jtsm = new JobTokenSecretManager(); + JobTokenIdentifier jtid = new JobTokenIdentifier(new Text()); + Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm); + TokenCache.setSessionToken(token, creds); + return creds; + } + + private static void stubSessionResources() throws IOException { + FileOutputStream out = new FileOutputStream( + new File(TEST_DIR, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); + PlanLocalResourcesProto planProto = PlanLocalResourcesProto.getDefaultInstance(); + planProto.writeDelimitedTo(out); + out.close(); + } + + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + stubSessionResources(); + conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false); + super.serviceInit(conf); + } + + @Override + protected DAGAppMasterShutdownHandler createShutdownHandler() { + mockShutdown = mock(DAGAppMasterShutdownHandler.class); + return mockShutdown; + } + + @Override + protected TaskSchedulerEventHandler createTaskSchedulerManager() { + return mockScheduler; + } + } }
