Repository: tez Updated Branches: refs/heads/master 005e8fb58 -> 91a397b0b
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/91a397b0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/91a397b0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/91a397b0 Branch: refs/heads/master Commit: 91a397b0baf57d4a09f64233a4dd7df7f8019c2c Parents: 005e8fb Author: Jason Lowe <[email protected]> Authored: Tue Sep 6 22:42:32 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Tue Sep 6 22:42:32 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/tez/dag/app/DAGAppMaster.java | 37 +++++++-- .../apache/tez/dag/app/TestDAGAppMaster.java | 81 +++++++++++++++++++- 3 files changed, 111 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/91a397b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 65cd05c..3f6281f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. @@ -103,6 +104,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. @@ -589,6 +591,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. http://git-wip-us.apache.org/repos/asf/tez/blob/91a397b0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cc07fb7..de19fa3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -226,6 +226,10 @@ public class DAGAppMaster extends AbstractService { private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); + @VisibleForTesting + static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. " + + "Application cannot recover and continue properly as DAG recovery has been disabled"; + private Clock clock; private final boolean isSession; private long appsStartTime; @@ -350,7 +354,7 @@ public class DAGAppMaster extends AbstractService { this.workingDirectory = workingDirectory; this.localDirs = localDirs; this.logDirs = logDirs; - this.shutdownHandler = new DAGAppMasterShutdownHandler(); + this.shutdownHandler = createShutdownHandler(); this.dagVersionInfo = new TezDagVersionInfo(); this.clientVersion = clientVersion; this.maxAppAttempts = maxAppAttempts; @@ -566,11 +570,7 @@ public class DAGAppMaster extends AbstractService { } } - - - this.taskSchedulerManager = new TaskSchedulerManager(context, - clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, - taskSchedulerDescriptors, isLocal); + this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors); addIfService(taskSchedulerManager, true); if (enableWebUIService()) { @@ -644,6 +644,19 @@ public class DAGAppMaster extends AbstractService { } @VisibleForTesting + protected DAGAppMasterShutdownHandler createShutdownHandler() { + return new DAGAppMasterShutdownHandler(); + } + + @VisibleForTesting + protected TaskSchedulerManager createTaskSchedulerManager( + List<NamedEntityDescriptor> taskSchedulerDescriptors) { + return new TaskSchedulerManager(context, + clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, + taskSchedulerDescriptors, isLocal); + } + + @VisibleForTesting protected ContainerSignatureMatcher createContainerSignatureMatcher() { return new ContainerContextMatcher(); } @@ -1974,8 +1987,16 @@ public class DAGAppMaster extends AbstractService { startServices(); super.serviceStart(); - if (versionMismatch) { - // Short-circuit and return as no DAG should not be run + boolean invalidSession = false; + if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() > 1) { + String err = INVALID_SESSION_ERR_MSG; + LOG.error(err); + addDiagnostic(err); + this.state = DAGAppMasterState.ERROR; + invalidSession = true; + } + if (versionMismatch || invalidSession) { + // Short-circuit and return as no DAG should be run this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); return; http://git-wip-us.apache.org/repos/asf/tez/blob/91a397b0/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 3ea5ba4..56d1f96 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -18,15 +18,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -50,7 +52,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.client.TezApiVersionInfo; import org.apache.tez.common.TezCommonUtils; @@ -66,9 +67,11 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto; import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.records.TezDAGID; import org.junit.After; import org.junit.Assert; @@ -99,6 +102,32 @@ public class TestDAGAppMaster { FileUtil.fullyDelete(TEST_DIR); } + @Test(timeout = 20000) + public void testInvalidSession() throws Exception { + // AM should fail if not the first attempt and in session mode and + // DAG recovery is disabled, otherwise the app can succeed without + // finishing an in-progress DAG. + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3); + TezConfiguration conf = new TezConfiguration(false); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + dam.init(conf); + dam.start(); + verify(dam.mockScheduler).setShouldUnregisterFlag(); + verify(dam.mockShutdown).shutdown(); + List<String> diags = dam.getDiagnostics(); + boolean found = false; + for (String diag : diags) { + if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) { + found = true; + break; + } + } + Assert.assertTrue("Missing invalid session diagnostics", found); + dam.stop(); + } + @Test(timeout = 5000) public void testPluginParsing() throws IOException { BiMap<String, Integer> pluginMap = HashBiMap.create(); @@ -510,4 +539,52 @@ public class TestDAGAppMaster { return new TestTokenIdentifier(); } } + + private static class DAGAppMasterForTest extends DAGAppMaster { + private DAGAppMasterShutdownHandler mockShutdown; + private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class); + + public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts) { + super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, + new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), + new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, + new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname", null); + } + + private static Credentials createCredentials() { + Credentials creds = new Credentials(); + JobTokenSecretManager jtsm = new JobTokenSecretManager(); + JobTokenIdentifier jtid = new JobTokenIdentifier(new Text()); + Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm); + TokenCache.setSessionToken(token, creds); + return creds; + } + + private static void stubSessionResources() throws IOException { + FileOutputStream out = new FileOutputStream( + new File(TEST_DIR, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); + PlanLocalResourcesProto planProto = PlanLocalResourcesProto.getDefaultInstance(); + planProto.writeDelimitedTo(out); + out.close(); + } + + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + stubSessionResources(); + conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false); + super.serviceInit(conf); + } + + @Override + protected DAGAppMasterShutdownHandler createShutdownHandler() { + mockShutdown = mock(DAGAppMasterShutdownHandler.class); + return mockShutdown; + } + + @Override + protected TaskSchedulerManager createTaskSchedulerManager( + List<NamedEntityDescriptor> taskSchedulerDescriptors) { + return mockScheduler; + } + } }
