Repository: tez
Updated Branches:
refs/heads/branch-0.8 b6b04de46 -> f86ed0d49
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for
certain cases (jlowe)
(cherry picked from commit 91a397b0baf57d4a09f64233a4dd7df7f8019c2c)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f86ed0d4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f86ed0d4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f86ed0d4
Branch: refs/heads/branch-0.8
Commit: f86ed0d495ae8b9f1d13c153a11ed06c61d8d4b6
Parents: b6b04de
Author: Jason Lowe <[email protected]>
Authored: Tue Sep 6 22:44:36 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Tue Sep 6 22:44:36 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 37 +++++++--
.../apache/tez/dag/app/TestDAGAppMaster.java | 81 +++++++++++++++++++-
3 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 20b2870..fbb305d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3426. Second AM attempt launched for session mode and recovery disabled
for certain cases
TEZ-3326. Display JVM system properties in AM and task logs.
TEZ-3009. Errors that occur during container task acquisition are not logged.
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
@@ -503,6 +504,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3426. Second AM attempt launched for session mode and recovery disabled
for certain cases
TEZ-3009. Errors that occur during container task acquisition are not logged.
TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion
for AppLaunchedEvent.
TEZ-3286. Allow clients to set processor reserved memory per vertex (instead
of per container).
http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cc07fb7..de19fa3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -226,6 +226,10 @@ public class DAGAppMaster extends AbstractService {
private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+ @VisibleForTesting
+ static final String INVALID_SESSION_ERR_MSG = "Initial application attempt
in session mode failed. "
+ + "Application cannot recover and continue properly as DAG recovery has
been disabled";
+
private Clock clock;
private final boolean isSession;
private long appsStartTime;
@@ -350,7 +354,7 @@ public class DAGAppMaster extends AbstractService {
this.workingDirectory = workingDirectory;
this.localDirs = localDirs;
this.logDirs = logDirs;
- this.shutdownHandler = new DAGAppMasterShutdownHandler();
+ this.shutdownHandler = createShutdownHandler();
this.dagVersionInfo = new TezDagVersionInfo();
this.clientVersion = clientVersion;
this.maxAppAttempts = maxAppAttempts;
@@ -566,11 +570,7 @@ public class DAGAppMaster extends AbstractService {
}
}
-
-
- this.taskSchedulerManager = new TaskSchedulerManager(context,
- clientRpcServer, dispatcher.getEventHandler(),
containerSignatureMatcher, webUIService,
- taskSchedulerDescriptors, isLocal);
+ this.taskSchedulerManager =
createTaskSchedulerManager(taskSchedulerDescriptors);
addIfService(taskSchedulerManager, true);
if (enableWebUIService()) {
@@ -644,6 +644,19 @@ public class DAGAppMaster extends AbstractService {
}
@VisibleForTesting
+ protected DAGAppMasterShutdownHandler createShutdownHandler() {
+ return new DAGAppMasterShutdownHandler();
+ }
+
+ @VisibleForTesting
+ protected TaskSchedulerManager createTaskSchedulerManager(
+ List<NamedEntityDescriptor> taskSchedulerDescriptors) {
+ return new TaskSchedulerManager(context,
+ clientRpcServer, dispatcher.getEventHandler(),
containerSignatureMatcher, webUIService,
+ taskSchedulerDescriptors, isLocal);
+ }
+
+ @VisibleForTesting
protected ContainerSignatureMatcher createContainerSignatureMatcher() {
return new ContainerContextMatcher();
}
@@ -1974,8 +1987,16 @@ public class DAGAppMaster extends AbstractService {
startServices();
super.serviceStart();
- if (versionMismatch) {
- // Short-circuit and return as no DAG should not be run
+ boolean invalidSession = false;
+ if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() > 1) {
+ String err = INVALID_SESSION_ERR_MSG;
+ LOG.error(err);
+ addDiagnostic(err);
+ this.state = DAGAppMasterState.ERROR;
+ invalidSession = true;
+ }
+ if (versionMismatch || invalidSession) {
+ // Short-circuit and return as no DAG should be run
this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 3ea5ba4..56d1f96 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -18,15 +18,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -50,7 +52,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.TezCommonUtils;
@@ -66,9 +67,11 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.records.TezDAGID;
import org.junit.After;
import org.junit.Assert;
@@ -99,6 +102,32 @@ public class TestDAGAppMaster {
FileUtil.fullyDelete(TEST_DIR);
}
+ @Test(timeout = 20000)
+ public void testInvalidSession() throws Exception {
+ // AM should fail if not the first attempt and in session mode and
+ // DAG recovery is disabled, otherwise the app can succeed without
+ // finishing an in-progress DAG.
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
2);
+ DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3);
+ TezConfiguration conf = new TezConfiguration(false);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+ dam.init(conf);
+ dam.start();
+ verify(dam.mockScheduler).setShouldUnregisterFlag();
+ verify(dam.mockShutdown).shutdown();
+ List<String> diags = dam.getDiagnostics();
+ boolean found = false;
+ for (String diag : diags) {
+ if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Missing invalid session diagnostics", found);
+ dam.stop();
+ }
+
@Test(timeout = 5000)
public void testPluginParsing() throws IOException {
BiMap<String, Integer> pluginMap = HashBiMap.create();
@@ -510,4 +539,52 @@ public class TestDAGAppMaster {
return new TestTokenIdentifier();
}
}
+
+ private static class DAGAppMasterForTest extends DAGAppMaster {
+ private DAGAppMasterShutdownHandler mockShutdown;
+ private TaskSchedulerManager mockScheduler =
mock(TaskSchedulerManager.class);
+
+ public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean
isSession, int maxAttempts) {
+ super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname",
12345, 12346,
+ new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(),
+ new String[] { TEST_DIR.getAbsolutePath() }, new String[] {
TEST_DIR.getAbsolutePath() },
+ new TezDagVersionInfo().getVersion(), maxAttempts,
createCredentials(), "jobname", null);
+ }
+
+ private static Credentials createCredentials() {
+ Credentials creds = new Credentials();
+ JobTokenSecretManager jtsm = new JobTokenSecretManager();
+ JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
+ Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid,
jtsm);
+ TokenCache.setSessionToken(token, creds);
+ return creds;
+ }
+
+ private static void stubSessionResources() throws IOException {
+ FileOutputStream out = new FileOutputStream(
+ new File(TEST_DIR,
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+ PlanLocalResourcesProto planProto =
PlanLocalResourcesProto.getDefaultInstance();
+ planProto.writeDelimitedTo(out);
+ out.close();
+ }
+
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ stubSessionResources();
+ conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected DAGAppMasterShutdownHandler createShutdownHandler() {
+ mockShutdown = mock(DAGAppMasterShutdownHandler.class);
+ return mockShutdown;
+ }
+
+ @Override
+ protected TaskSchedulerManager createTaskSchedulerManager(
+ List<NamedEntityDescriptor> taskSchedulerDescriptors) {
+ return mockScheduler;
+ }
+ }
}