Repository: tez Updated Branches: refs/heads/master a5ffdea62 -> e84231ebc
TEZ-3253. Remove special handling for last app attempt. Contributed by Akira Ajisaka. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e84231eb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e84231eb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e84231eb Branch: refs/heads/master Commit: e84231ebc9f984b9ecfc2fd8ff489ddfc627092b Parents: a5ffdea Author: Siddharth Seth <[email protected]> Authored: Mon Mar 6 18:17:04 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Mar 6 18:17:04 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/LocalClient.java | 2 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 23 ++------------------ .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../apache/tez/dag/app/TestDAGAppMaster.java | 8 +++---- 5 files changed, 9 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e84231eb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c28405..a3a74bc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3253. Remove special handling for last app attempt. TEZ-3648. IFile.Write#close has an extra output stream flush TEZ-3649. AsyncHttpConnection should add StopWatch start. TEZ-3647. Add a setting which lets Tez determine Xmx. http://git-wip-us.apache.org/repos/asf/tez/blob/e84231eb/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index db7fc2c..6baea48 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -362,7 +362,7 @@ public class LocalClient extends FrameworkClient { return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), 1, credentials, jobUserName, amPluginDescriptorProto); + versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); } private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf, http://git-wip-us.apache.org/repos/asf/tez/blob/e84231eb/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index de02c18..fc24f04 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -268,7 +268,6 @@ public class DAGAppMaster extends AbstractService { private HistoryEventHandler historyEventHandler; private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>(); private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>(); - private final int maxAppAttempts; private final List<String> diagnostics = new ArrayList<String>(); private String containerLogs; @@ -346,7 +345,7 @@ public class DAGAppMaster extends AbstractService { public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, - String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts, + String [] localDirs, String[] logDirs, String clientVersion, Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { super(DAGAppMaster.class.getName()); this.clock = clock; @@ -365,7 +364,6 @@ public class DAGAppMaster extends AbstractService { this.shutdownHandler = createShutdownHandler(); this.dagVersionInfo = new TezDagVersionInfo(); this.clientVersion = clientVersion; - this.maxAppAttempts = maxAppAttempts; this.amCredentials = credentials; this.amPluginDescriptorProto = pluginDescriptorProto; this.appMasterUgi = UserGroupInformation @@ -461,8 +459,6 @@ public class DAGAppMaster extends AbstractService { TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK, TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT); - isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; - // Check client - AM version compatibility LOG.info("Comparing client version with AM version" + ", clientVersion=" + clientVersion @@ -591,13 +587,6 @@ public class DAGAppMaster extends AbstractService { addIfServiceDependency(taskSchedulerManager, webUIService); } - if (isLastAMRetry) { - LOG.info("AM will unregister as this is the last attempt" - + ", currentAttempt=" + appAttemptID.getAttemptId() - + ", maxAttempts=" + maxAppAttempts); - this.taskSchedulerManager.setShouldUnregisterFlag(); - } - dispatcher.register(AMSchedulerEventType.class, taskSchedulerManager); addIfServiceDependency(taskSchedulerManager, clientRpcServer); @@ -2401,14 +2390,6 @@ public class DAGAppMaster extends AbstractService { clientVersion = VersionInfo.UNKNOWN; } - // TODO Should this be defaulting to 1. Was there a version of YARN where this was not setup ? - int maxAppAttempts = 1; - String maxAppAttemptsEnv = System.getenv( - ApplicationConstants.MAX_APP_ATTEMPTS_ENV); - if (maxAppAttemptsEnv != null) { - maxAppAttempts = Integer.parseInt(maxAppAttemptsEnv); - } - validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV); @@ -2465,7 +2446,7 @@ public class DAGAppMaster extends AbstractService { System.getenv(Environment.PWD.name()), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())), - clientVersion, maxAppAttempts, credentials, jobUserName, amPluginDescriptorProto); + clientVersion, credentials, jobUserName, amPluginDescriptorProto); ShutdownHookManager.get().addShutdownHook( new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); http://git-wip-us.apache.org/repos/asf/tez/blob/e84231eb/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b021a36..893e03d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -497,7 +497,7 @@ public class MockDAGAppMaster extends DAGAppMaster { AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag, Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, - isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1, + isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), credentials, jobUserName, null); shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.launcherGoFlag = launcherGoFlag; http://git-wip-us.apache.org/repos/asf/tez/blob/e84231eb/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 56d1f96..570c6dc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -109,7 +109,7 @@ public class TestDAGAppMaster { // finishing an in-progress DAG. ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); - DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3); + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true); TezConfiguration conf = new TezConfiguration(false); conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); dam.init(conf); @@ -427,7 +427,7 @@ public class TestDAGAppMaster { "127.0.0.1", 0, 0, new SystemClock(), 1, true, TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, new String[] {TEST_DIR.toString()}, - new TezApiVersionInfo().getVersion(), 1, amCreds, + new TezApiVersionInfo().getVersion(), amCreds, "someuser", null); am.init(conf); am.start(); @@ -544,11 +544,11 @@ public class TestDAGAppMaster { private DAGAppMasterShutdownHandler mockShutdown; private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class); - public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts) { + public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) { super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, - new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname", null); + new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null); } private static Credentials createCredentials() {
