Repository: tez Updated Branches: refs/heads/branch-0.8 ceadfc677 -> d9a698e49
TEZ-3493. DAG submit timeout cannot be set to a month (Hitesh Shah via jeagles) (cherry picked from commit ad68f73583681f79d825381d3bed8b80867d87e1) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d9a698e4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d9a698e4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d9a698e4 Branch: refs/heads/branch-0.8 Commit: d9a698e4969b9eac9147ff9f2e337bd0ff9b1a73 Parents: ceadfc6 Author: Jonathan Eagles <[email protected]> Authored: Thu Nov 3 20:50:10 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Nov 3 21:45:03 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../org/apache/tez/common/TezCommonUtils.java | 15 ++++++++++++ .../apache/tez/dag/api/TezConfiguration.java | 3 ++- .../apache/tez/common/TestTezCommonUtils.java | 24 ++++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 8 +++---- 5 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d9a698e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cc9d2ef..884b1cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3493. DAG submit timeout cannot be set to a month TEZ-3505. Move license to the file header for TezBytesWritableSerialization TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess. @@ -524,6 +525,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3493. DAG submit timeout cannot be set to a month TEZ-3505. Move license to the file header for TezBytesWritableSerialization TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. http://git-wip-us.apache.org/repos/asf/tez/blob/d9a698e4/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index e4cf028..707a918 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -459,4 +459,19 @@ public class TezCommonUtils { jobToken.write(jobToken_dob); return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); } + + public static long getDAGSessionTimeout(Configuration conf) { + int timeoutSecs = conf.getInt( + TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, + TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT); + if (timeoutSecs < 0) { + return -1; + } + // Handle badly configured value to minimize impact of a spinning thread + if (timeoutSecs == 0) { + timeoutSecs = 1; + } + return 1000l * timeoutSecs; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/d9a698e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 5aa8f7e..62d9c9a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1128,7 +1128,8 @@ public class TezConfiguration extends Configuration { /** * Int value. Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before - * shutting down. Only relevant in session mode. + * shutting down. Only relevant in session mode. Any negative value will disable this check and + * allow the AM to hang around forever in idle mode. */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") http://git-wip-us.apache.org/repos/asf/tez/blob/d9a698e4/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java index a7e6069..971c6ed 100644 --- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java +++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java @@ -314,4 +314,28 @@ public class TestTezCommonUtils { } + @Test(timeout=5000) + public void testGetDAGSessionTimeout() { + Configuration conf = new Configuration(false); + Assert.assertEquals(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT*1000, + TezCommonUtils.getDAGSessionTimeout(conf)); + + // set to 1 month - * 1000 guaranteed to cross positive integer boundary + conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, + 24 * 60 * 60 * 30); + Assert.assertEquals(86400l*1000*30, + TezCommonUtils.getDAGSessionTimeout(conf)); + + // set to negative val + conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, + -24 * 60 * 60 * 30); + Assert.assertEquals(-1, + TezCommonUtils.getDAGSessionTimeout(conf)); + + conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 0); + Assert.assertEquals(1000, + TezCommonUtils.getDAGSessionTimeout(conf)); + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/d9a698e4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index de19fa3..61c6f76 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -596,9 +596,7 @@ public class DAGAppMaster extends AbstractService { historyEventHandler = createHistoryEventHandler(context); addIfService(historyEventHandler, true); - this.sessionTimeoutInterval = 1000 * amConf.getInt( - TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, - TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT); + this.sessionTimeoutInterval = TezCommonUtils.getDAGSessionTimeout(amConf); if (!versionMismatch) { if (isSession) { @@ -2108,8 +2106,8 @@ public class DAGAppMaster extends AbstractService { } } - if (isSession) { - this.dagSubmissionTimer = new Timer(true); + if (isSession && sessionTimeoutInterval >= 0) { + this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true); this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() {
