Repository: tez Updated Branches: refs/heads/master 24a8a95bd -> 4cbc99d04
TEZ-3417. Reduce sleep time on AM shutdown to reduce test runtimes (Hitesh Shah via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4cbc99d0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4cbc99d0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4cbc99d0 Branch: refs/heads/master Commit: 4cbc99d04bc6300b2e31b6673319deb534f51f82 Parents: 24a8a95 Author: Jonathan Eagles <[email protected]> Authored: Wed Jan 25 18:01:31 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Wed Jan 25 18:01:31 2017 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 11 ++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 22 ++++++++++++++++---- .../org/apache/tez/mapreduce/TestMRRJobs.java | 2 ++ .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 1 + .../org/apache/tez/test/TestAMRecovery.java | 1 + .../org/apache/tez/test/TestDAGRecovery.java | 1 + .../org/apache/tez/test/TestDAGRecovery2.java | 1 + .../tez/test/TestExceptionPropagation.java | 1 + .../org/apache/tez/test/TestFaultTolerance.java | 7 +++++++ .../apache/tez/test/TestPipelinedShuffle.java | 2 ++ .../java/org/apache/tez/test/TestRecovery.java | 1 + .../org/apache/tez/test/TestSecureShuffle.java | 3 +++ .../tez/test/TestTaskErrorsUsingLocalMode.java | 3 ++- .../java/org/apache/tez/test/TestTezJobs.java | 1 + 15 files changed, 53 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d6798ff..ee31155 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3417. Reduce sleep time on AM shutdown to reduce test runtimes TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results. TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM failure. TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8747616..df0605c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1684,6 +1684,17 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "am.ats.v15.override.summary-types"; public static final boolean TEZ_AM_ATS_V15_OVERRIDE_SUMMARY_TYPES_DEFAULT = true; + /** + * Integer value in milliseconds. Default value is 5000 milliseconds. + * The time for which the AM waits after the final DAG completes or when shutdown is invoked + * before completing shutdown. This allows a client to retrieve any required info directly from + * the AM on completion of a DAG. + */ + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS = + TEZ_AM_PREFIX + "sleep.time.before.exit.millis"; + /** * String value. Determines what JVM properties will be logged for debugging purposes * in the AM and Task runtime logs. http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 7bda424..eaaf18b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -432,6 +432,12 @@ public class DAGAppMaster extends AbstractService { initResourceCalculatorPlugins(); this.hadoopShim = new HadoopShimsLoader(this.amConf).getHadoopShim(); + long sleepTimeBeforeSecs = this.amConf.getLong( + TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, + TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT); + if (sleepTimeBeforeSecs >= 0) { + this.shutdownHandler.setSleepTimeBeforeExit(sleepTimeBeforeSecs); + } this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); @@ -924,6 +930,11 @@ public class DAGAppMaster extends AbstractService { protected class DAGAppMasterShutdownHandler { private AtomicBoolean shutdownHandled = new AtomicBoolean(false); + private long sleepTimeBeforeExit = TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT; + + void setSleepTimeBeforeExit(long sleepTimeBeforeExit) { + this.sleepTimeBeforeExit = sleepTimeBeforeExit; + } public void shutdown() { shutdown(false); @@ -941,16 +952,19 @@ public class DAGAppMaster extends AbstractService { } LOG.info("Handling DAGAppMaster shutdown"); - AMShutdownRunnable r = new AMShutdownRunnable(now); + AMShutdownRunnable r = new AMShutdownRunnable(now, sleepTimeBeforeExit); Thread t = new Thread(r, "AMShutdownThread"); t.start(); } private class AMShutdownRunnable implements Runnable { private final boolean immediateShutdown; + private final long sleepTimeBeforeExit; - public AMShutdownRunnable(boolean immediateShutdown) { + public AMShutdownRunnable(boolean immediateShutdown, + long sleepTimeBeforeExit) { this.immediateShutdown = immediateShutdown; + this.sleepTimeBeforeExit = sleepTimeBeforeExit; } @Override @@ -959,8 +973,8 @@ public class DAGAppMaster extends AbstractService { // final states. Will be removed once RM come on. TEZ-160. if (!immediateShutdown) { try { - LOG.info("Sleeping for 5 seconds before shutting down"); - Thread.sleep(TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT); + LOG.info("Sleeping for {} ms before shutting down", sleepTimeBeforeExit); + Thread.sleep(sleepTimeBeforeExit); } catch (InterruptedException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java index a90011c..c00ea36 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java @@ -21,6 +21,7 @@ package org.apache.tez.mapreduce; import java.io.File; import java.io.IOException; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.RandomTextWriterJob; @@ -86,6 +87,7 @@ public class TestMRRJobs { conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); mrrTezCluster.init(conf); mrrTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 92f70c5..5ce9c5d 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -163,6 +163,7 @@ public class TestMRRJobsDAGApi { Configuration conf = new Configuration(); conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000); + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); mrrTezCluster.init(conf); mrrTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 04b0a03..f00ae5c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -116,6 +116,7 @@ public class TestAMRecovery { Configuration miniTezconf = new Configuration(conf); miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); miniTezCluster.init(miniTezconf); miniTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index 8e41b7e..b0c9ccc 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -81,6 +81,7 @@ public class TestDAGRecovery { Configuration miniTezconf = new Configuration(conf); miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); miniTezCluster.init(miniTezconf); miniTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java index 02f93c9..90b6dd0 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java @@ -78,6 +78,7 @@ public class TestDAGRecovery2 { Configuration miniTezconf = new Configuration(conf); miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); miniTezCluster.init(miniTezconf); miniTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index 438a5aa..fc1dab7 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -125,6 +125,7 @@ public class TestExceptionPropagation { Configuration miniTezconf = new Configuration(conf); miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); miniTezCluster.init(miniTezconf); miniTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index 08bac0d..af1cb6f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -59,6 +59,7 @@ import org.apache.tez.test.dag.TwoLevelsFailingDAG; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Ignore; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -105,6 +106,7 @@ public class TestFaultTolerance { tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4); tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3); tezConf.setInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, 100); + tezConf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); tezSession = TezClient.create("TestFaultTolerance", tezConf, true); tezSession.start(); @@ -127,6 +129,11 @@ public class TestFaultTolerance { } } + @Before + public void checkSessionStatus() { + // TODO restart session if it crashed due to some test error + } + void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception { runDAGAndVerify(dag, finalState, -1); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java index 52342a2..36ac488 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java @@ -112,6 +112,8 @@ public class TestPipelinedShuffle { //set to low value so that it can detect failures quickly conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); + miniTezCluster = new MiniTezCluster(TestPipelinedShuffle.class.getName(), 1, 1, 1); miniTezCluster.init(conf); http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index b9229e2..93fd972 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -113,6 +113,7 @@ public class TestRecovery { Configuration miniTezconf = new Configuration(conf); miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); miniTezCluster.init(miniTezconf); miniTezCluster.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java index e3e42d3..0fb07fc 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.mapreduce.examples.TestOrderedWordCount; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.After; @@ -130,6 +131,8 @@ public class TestSecureShuffle { //set to low value so that it can detect failures quickly conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); + miniTezCluster = new MiniTezCluster(TestSecureShuffle.class.getName() + "-" + (enableSSLInCluster ? "withssl" : "withoutssl"), 1, 1, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java index f413bdd..d622698 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java @@ -124,7 +124,8 @@ public class TestTaskErrorsUsingLocalMode { tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); tezConf1.set("fs.defaultFS", "file:///"); tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); - TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); + tezConf1.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); + TezClient tezClient1 = TezClient.create(name, tezConf1, true); tezClient1.start(); return tezClient1; } http://git-wip-us.apache.org/repos/asf/tez/blob/4cbc99d0/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 5c50a34..be9b0bf 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -138,6 +138,7 @@ public class TestTezJobs { mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1); Configuration conf = new Configuration(); conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500); mrrTezCluster.init(conf); mrrTezCluster.start(); }
