Repository: tez Updated Branches: refs/heads/master a1563eff7 -> 1cd62dedc
TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1cd62ded Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1cd62ded Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1cd62ded Branch: refs/heads/master Commit: 1cd62dedcc1fcf468a7c8a38f9c9ffbd0b28aa77 Parents: a1563ef Author: Hitesh Shah <[email protected]> Authored: Fri Oct 21 15:33:03 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Fri Oct 21 15:33:03 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-api/findbugs-exclude.xml | 10 ++ .../java/org/apache/tez/client/TezClient.java | 89 +++++++++++++-- .../org/apache/tez/common/TezCommonUtils.java | 58 ++++++++++ .../java/org/apache/tez/common/TezUtils.java | 21 ---- .../apache/tez/dag/api/TezConfiguration.java | 26 ++++- .../org/apache/tez/dag/api/TezConstants.java | 12 ++ .../org/apache/tez/client/TestTezClient.java | 36 ++++++ .../apache/tez/common/TestTezCommonUtils.java | 71 ++++++++++++ .../org/apache/tez/common/TestTezUtils.java | 22 ---- tez-dag/findbugs-exclude.xml | 2 + .../java/org/apache/tez/client/LocalClient.java | 2 +- .../tez/dag/api/client/DAGClientHandler.java | 11 ++ .../tez/dag/api/client/DAGClientServer.java | 3 + ...DAGClientAMProtocolBlockingPBServerImpl.java | 7 ++ .../org/apache/tez/dag/app/DAGAppMaster.java | 73 +++++++++++- .../dag/api/client/TestDAGClientHandler.java | 2 + .../org/apache/tez/runtime/task/TezChild.java | 5 +- .../java/org/apache/tez/test/TestTezJobs.java | 114 +++++++++++++++++++ 19 files changed, 503 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dabc704..1afcacb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. TEZ-3483. Create basic travis yml file for Tez. TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second. http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml index 95b9207..10d27f7 100644 --- a/tez-api/findbugs-exclude.xml +++ b/tez-api/findbugs-exclude.xml @@ -121,4 +121,14 @@ <Bug pattern="EI_EXPOSE_REP" /> </Match> + <Match> + <Class name="org.apache.tez.client.TezClient"/> + <Or> + <Field name="clientTimeout"/> + <Field name="frameworkClient"/> + <Field name="sessionAppId"/> + </Or> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> + </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 780fcb7..29e7a8b 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -22,6 +22,10 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.NumberFormat; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.TimeUnit; import java.util.HashMap; @@ -81,6 +85,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ServiceException; /** @@ -118,8 +123,8 @@ public class TezClient { private String diagnostics; @VisibleForTesting final boolean isSession; - private boolean sessionStarted = false; - private boolean sessionStopped = false; + private final AtomicBoolean sessionStarted = new AtomicBoolean(false); + private final AtomicBoolean sessionStopped = new AtomicBoolean(false); /** Tokens which will be required for all DAGs submitted to this session. */ private Credentials sessionCredentials = new Credentials(); private long clientTimeout; @@ -143,6 +148,8 @@ public class TezClient { private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0); private FileSystem stagingFs = null; + private ScheduledExecutorService amKeepAliveService; + private TezClient(String name, TezConfiguration tezConf) { this(name, tezConf, tezConf.getBoolean( TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT)); @@ -315,7 +322,7 @@ public class TezClient { */ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) { Preconditions.checkNotNull(localFiles); - if (isSession && sessionStarted) { + if (isSession && sessionStarted.get()) { additionalLocalResources.putAll(localFiles); } amConfig.addAMLocalResources(localFiles); @@ -345,7 +352,7 @@ public class TezClient { */ public synchronized void setAppMasterCredentials(Credentials credentials) { Preconditions - .checkState(!sessionStarted, + .checkState(!sessionStarted.get(), "Credentials cannot be set after the session App Master has been started"); amConfig.setCredentials(credentials); } @@ -433,15 +440,66 @@ public class TezClient { frameworkClient.submitApplication(appContext); ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId); LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl()); - sessionStarted = true; + sessionStarted.set(true); } catch (YarnException e) { throw new TezException(e); } + long amClientKeepAliveTimeoutIntervalMillis = + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration()); + // Poll at minimum of 1 second interval + long pollPeriod = TezCommonUtils. + getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(), + amClientKeepAliveTimeoutIntervalMillis, 10); + + boolean isLocal = amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) { + amKeepAliveService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build()); + amKeepAliveService.scheduleWithFixedDelay(new Runnable() { + + private DAGClientAMProtocolBlockingPB proxy; + + @Override + public void run() { + proxy = sendAMHeartbeat(proxy); + } + }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS); + } + this.stagingFs = FileSystem.get(amConfig.getTezConfiguration()); } } - + + public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlockingPB proxy) { + if (sessionStopped.get()) { + // Ignore sending heartbeat as session being stopped + return null; + } + try { + if (proxy == null) { + try { + proxy = waitForProxy(); + } catch (InterruptedException e) { + LOG.debug("Interrupted while trying to create a connection to the AM", e); + } + } + if (proxy != null) { + LOG.debug("Sending heartbeat to AM"); + proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build()); + } + return proxy; + } catch (Exception e) { + LOG.info("Exception when sending heartbeat to AM for app {}: {}", sessionAppId, + e.getMessage()); + LOG.debug("Error when sending heartbeat ping to AM. Resetting AM proxy for app: {}" + + " due to exception :", sessionAppId, e); + return null; + } + } + /** * Submit a DAG. <br>In non-session mode, it submits a new App Master to the * cluster.<br>In session mode, it submits the DAG to the session App Master. It @@ -566,11 +624,14 @@ public class TezClient { */ public synchronized void stop() throws TezException, IOException { try { - if (sessionStarted) { + if (amKeepAliveService != null) { + amKeepAliveService.shutdownNow(); + } + if (sessionStarted.get()) { LOG.info("Shutting down Tez Session" + ", sessionName=" + clientName + ", applicationId=" + sessionAppId); - sessionStopped = true; + sessionStopped.set(true); boolean sessionShutdownSuccessful = false; try { DAGClientAMProtocolBlockingPB proxy = getAMProxy(sessionAppId); @@ -914,9 +975,9 @@ public class TezClient { private void verifySessionStateForSubmission() throws SessionNotRunning { Preconditions.checkState(isSession, "Invalid without session mode"); - if (!sessionStarted) { + if (!sessionStarted.get()) { throw new SessionNotRunning("Session not started"); - } else if (sessionStopped) { + } else if (sessionStopped.get()) { throw new SessionNotRunning("Session stopped by user"); } } @@ -1031,6 +1092,14 @@ public class TezClient { append(tezDagIdFormat.get().format(1)).toString(); } + @VisibleForTesting + @Private + public synchronized void cancelAMKeepAlive() { + if (amKeepAliveService != null) { + amKeepAliveService.shutdownNow(); + } + } + /** * A builder for setting up an instance of {@link org.apache.tez.client.TezClient} */ http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index afdce39..69e48b2 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -487,4 +487,62 @@ public class TezCommonUtils { jobToken.write(jobToken_dob); return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); } + + public static String getSystemPropertiesToLog(Configuration conf) { + Collection <String> keys = conf.getTrimmedStringCollection( + TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG); + if (keys.isEmpty()) { + keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT; + } + StringBuilder sb = new StringBuilder(); + sb.append("\n/************************************************************\n"); + sb.append("[system properties]\n"); + for (String key : keys) { + sb.append(key).append(": ").append(System.getProperty(key)).append('\n'); + } + sb.append("************************************************************/"); + return sb.toString(); + } + + /** + * Helper function to get the heartbeat interval for client-AM heartbeats + * See {@link TezConfiguration#TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS} for more details. + * @param conf Configuration object + * @return heartbeat interval in milliseconds. -1 implies disabled. + */ + public static long getAMClientHeartBeatTimeoutMillis(Configuration conf) { + int val = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, + TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT); + if (val < 0) { + return -1; + } + if (val > 0 && val < TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM) { + return TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000; + } + return val * 1000; + } + + /** + * Helper function to get the poll interval for client-AM heartbeats. + * @param conf Configuration object + * @param heartbeatIntervalMillis Heartbeat interval in milliseconds + * @param buckets How many times to poll within the provided heartbeat interval + * @return poll interval in milliseconds + */ + public static long getAMClientHeartBeatPollIntervalMillis(Configuration conf, + long heartbeatIntervalMillis, + int buckets) { + if (heartbeatIntervalMillis <= 0) { + return -1; + } + int pollInterval = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, + TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT); + if (pollInterval > 0) { + return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM, + pollInterval); + } + return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM, + heartbeatIntervalMillis/buckets); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java index 3aa1914..dfdf9fa 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java @@ -20,7 +20,6 @@ package org.apache.tez.common; import java.io.IOException; import java.io.OutputStream; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,14 +36,12 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.tez.client.TezClientUtils; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; - /** * Utility methods for setting up a DAG. Has helpers for setting up log4j configuration, converting * {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc. @@ -68,23 +65,6 @@ public class TezUtils { TezClientUtils.addLog4jSystemProperties(logLevel, vargs); } - public static String getSystemPropertiesToLog(Configuration conf) { - Collection <String> keys = conf.getTrimmedStringCollection( - TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG); - if (keys.isEmpty()) { - keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT; - } - StringBuilder sb = new StringBuilder(); - sb.append("\n/************************************************************\n"); - sb.append("[system properties]\n"); - for (String key : keys) { - sb.append(key).append(": ").append(System.getProperty(key)).append('\n'); - } - sb.append("************************************************************/"); - return sb.toString(); - } - - /** * Convert a Configuration to compressed ByteString using Protocol buffer * @@ -201,5 +181,4 @@ public class TezUtils { return convertToHistoryText(null, conf); } - } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 77ea4ff..c4272b7 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -606,7 +606,7 @@ public class TezConfiguration extends Configuration { @ConfigurationProperty(type="integer") public static final String TEZ_AM_CLIENT_THREAD_COUNT = TEZ_AM_PREFIX + "client.am.thread-count"; - public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1; + public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 2; /** * String value. Range of ports that the AM can use when binding for client connections. Leave blank @@ -1677,4 +1677,28 @@ public class TezConfiguration extends Configuration { "java.vendor","java.version","java.vm.name","java.class.path", "java.io.tmpdir","user.dir","user.name")); + /** + * Int value. Time interval (in seconds). If the Tez AM does not receive a heartbeat from the + * client within this time interval, it will kill any running DAG and shut down. Required to + * re-cycle orphaned Tez applications where the client is no longer alive. A negative value + * can be set to disable this check. For a positive value, the minimum value is 10 seconds. + * Values between 0 and 10 seconds will be reset to the minimum value. + * Only relevant in session mode. + * This is disabled by default i.e. by default, the Tez AM will go on to + * complete the DAG and only kill itself after hitting the DAG submission timeout defined by + * {@link #TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS} + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS = + TEZ_PREFIX + "am.client.heartbeat.timeout.secs"; + public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT = -1; + + + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS = + TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis"; + public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1; + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 6e1cb2d..06b9cb7 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -112,4 +112,16 @@ public class TezConstants { public static String getTezUberServicePluginName() { return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM; } + + /** + * Minimum heartbeat timeout value for the Client to AM heartbeat. + */ + public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM = 10; + + /** + * Minimum polling interval used for the client-AM heartbeat. + */ + public static final long TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM = 1000; + + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 48dfff4..dbbd619 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -794,4 +794,40 @@ public class TestTezClient { } catch (ApplicationNotFoundException e) { } } + + @Test(timeout = 30000) + public void testAMClientHeartbeat() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 10); + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + long start = System.currentTimeMillis(); + while (true) { + if (System.currentTimeMillis() > (start + 5000)) { + break; + } + Thread.sleep(1000); + } + client.stop(); + verify(client.sessionAmProxy, atLeast(3)).getAMStatus(any(RpcController.class), + any(GetAMStatusRequestProto.class)); + + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -1); + final TezClientForTest client2 = configureAndCreateTezClient(conf); + client2.start(); + start = System.currentTimeMillis(); + while (true) { + if (System.currentTimeMillis() > (start + 5000)) { + break; + } + Thread.sleep(1000); + } + client2.stop(); + verify(client2.sessionAmProxy, times(0)).getAMStatus(any(RpcController.class), + any(GetAMStatusRequestProto.class)); + + + } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java index a7e6069..5f0b33b 100644 --- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java +++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java @@ -314,4 +314,75 @@ public class TestTezCommonUtils { } + @Test (timeout = 5000) + public void testAMClientHeartBeatTimeout() { + TezConfiguration conf = new TezConfiguration(false); + + // -1 for any negative value + Assert.assertEquals(-1, + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf)); + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -2); + Assert.assertEquals(-1, + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf)); + + // For any value > 0 but less than min, revert to min + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, + TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM - 1); + Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000, + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf)); + + // For val > min, should remain val as configured + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, + TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2); + Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2000, + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf)); + + conf = new TezConfiguration(false); + Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -1, 10)); + Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -123, 10)); + Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 0, 10)); + + // min poll interval is 1000 + Assert.assertEquals(1000, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 600, 10)); + + // Poll interval is heartbeat interval/10 + Assert.assertEquals(2000, + TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 10)); + + // Configured poll interval ignored + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, -1); + Assert.assertEquals(4000, + TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5)); + + // Positive poll interval is allowed + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, 2000); + Assert.assertEquals(2000, + TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5)); + + + } + + @Test + public void testLogSystemProperties() throws Exception { + Configuration conf = new Configuration(); + // test default logging + conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " "); + String value = TezCommonUtils.getSystemPropertiesToLog(conf); + for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) { + Assert.assertTrue(value.contains(key)); + } + + // test logging of selected keys + String classpath = "java.class.path"; + String os = "os.name"; + String version = "java.version"; + conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os); + value = TezCommonUtils.getSystemPropertiesToLog(conf); + Assert.assertNotNull(value); + Assert.assertTrue(value.contains(classpath)); + Assert.assertTrue(value.contains(os)); + Assert.assertFalse(value.contains(version)); + } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index 2eab776..61bb9a7 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -230,26 +230,4 @@ public class TestTezUtils { } - @Test - public void testLogSystemProperties() throws Exception { - Configuration conf = new Configuration(); - // test default logging - conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " "); - String value = TezUtils.getSystemPropertiesToLog(conf); - for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) { - assertTrue(value.contains(key)); - } - - // test logging of selected keys - String classpath = "java.class.path"; - String os = "os.name"; - String version = "java.version"; - conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os); - value = TezUtils.getSystemPropertiesToLog(conf); - assertNotNull(value); - assertTrue(value.contains(classpath)); - assertTrue(value.contains(os)); - assertFalse(value.contains(version)); - } - } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index e8adbb3..c3e099e 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -150,6 +150,8 @@ <Class name="org.apache.tez.dag.app.DAGAppMaster"/> <Or> <Field name="context"/> + <Field name="clientAMHeartbeatTimeoutIntervalMillis"/> + <Field name="clientHandler"/> <Field name="currentDAG"/> <Field name="state"/> <Field name="taskSchedulerManager"/> http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 474f4ca..7c65c07 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -319,8 +319,8 @@ public class LocalClient extends FrameworkClient { new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath(), new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()}, amCredentials, UserGroupInformation.getCurrentUser().getShortUserName()); - clientHandler = new DAGClientHandler(dagAppMaster); DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf); + clientHandler = new DAGClientHandler(dagAppMaster); } catch (Throwable t) { LOG.error("Error starting DAGAppMaster", t); http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 0f51eff..618676d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +44,11 @@ public class DAGClientHandler { private Logger LOG = LoggerFactory.getLogger(DAGClientHandler.class); private DAGAppMaster dagAppMaster; + private final AtomicLong lastHeartbeatTime; public DAGClientHandler(DAGAppMaster dagAppMaster) { this.dagAppMaster = dagAppMaster; + this.lastHeartbeatTime = new AtomicLong(dagAppMaster.getContext().getClock().getTime()); } private DAG getCurrentDAG() { @@ -177,4 +180,12 @@ public class DAGClientHandler { return dag.getACLManager(); } + public void updateLastHeartbeatTime() { + lastHeartbeatTime.set(dagAppMaster.getContext().getClock().getTime()); + } + + public long getLastHeartbeatTime() { + return lastHeartbeatTime.get(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java index 38f6740..14de870 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java @@ -75,6 +75,9 @@ public class DAGClientServer extends AbstractService { int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT, TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT); + if (numHandlers < 2) { + numHandlers = 2; + } server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf, numHandlers, blockingService, TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE); http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index 32124b9..baac186 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -80,6 +80,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager().checkAMViewAccess(user)) { throw new AccessControlException("User " + user + " cannot perform AM view operation"); } + real.updateLastHeartbeatTime(); try{ List<String> dagIds = real.getAllDAGs(); return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build(); @@ -98,6 +99,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager(dagId).checkDAGViewAccess(user)) { throw new AccessControlException("User " + user + " cannot perform DAG view operation"); } + real.updateLastHeartbeatTime(); DAGStatus status; status = real.getDAGStatus(dagId, DagTypeConverters.convertStatusGetOptsFromProto( @@ -120,6 +122,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager(dagId).checkDAGViewAccess(user)) { throw new AccessControlException("User " + user + " cannot perform DAG view operation"); } + real.updateLastHeartbeatTime(); String vertexName = request.getVertexName(); VertexStatus status = real.getVertexStatus(dagId, vertexName, DagTypeConverters.convertStatusGetOptsFromProto( @@ -142,6 +145,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager(dagId).checkDAGModifyAccess(user)) { throw new AccessControlException("User " + user + " cannot perform DAG modify operation"); } + real.updateLastHeartbeatTime(); real.tryKillDAG(dagId); return TryKillDAGResponseProto.newBuilder().build(); } catch (TezException e) { @@ -156,6 +160,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager().checkAMModifyAccess(user)) { throw new AccessControlException("User " + user + " cannot perform AM modify operation"); } + real.updateLastHeartbeatTime(); try{ if (request.hasSerializedRequestPath()) { // need to deserialize large request from hdfs @@ -190,6 +195,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager().checkAMModifyAccess(user)) { throw new AccessControlException("User " + user + " cannot perform AM modify operation"); } + real.updateLastHeartbeatTime(); try { real.shutdownAM(); return ShutdownSessionResponseProto.newBuilder().build(); @@ -205,6 +211,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager().checkAMViewAccess(user)) { throw new AccessControlException("User " + user + " cannot perform AM view operation"); } + real.updateLastHeartbeatTime(); try { TezAppMasterStatus sessionStatus = real.getTezAppMasterStatus(); return GetAMStatusResponseProto.newBuilder().setStatus( http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index de19fa3..062f29d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -49,6 +49,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -272,6 +274,11 @@ public class DAGAppMaster extends AbstractService { private boolean isLocal = false; //Local mode flag + // Timeout interval which if set will cause a running + // DAG to be killed and AM shutdown if the client has not + // pinged/heartbeated to the AM in the given time period. + private long clientAMHeartbeatTimeoutIntervalMillis = -1; + @VisibleForTesting protected DAGAppMasterShutdownHandler shutdownHandler; private final AtomicBoolean shutdownHandlerRunning = new AtomicBoolean(false); @@ -290,6 +297,7 @@ public class DAGAppMaster extends AbstractService { private long sessionTimeoutInterval; private long lastDAGCompletionTime; private Timer dagSubmissionTimer; + private ScheduledExecutorService clientAMHeartBeatTimeoutService; private boolean recoveryEnabled; private Path recoveryDataDir; private Path currentRecoveryDataDir; @@ -599,6 +607,8 @@ public class DAGAppMaster extends AbstractService { this.sessionTimeoutInterval = 1000 * amConf.getInt( TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT); + this.clientAMHeartbeatTimeoutIntervalMillis = + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConf); if (!versionMismatch) { if (isSession) { @@ -2109,18 +2119,44 @@ public class DAGAppMaster extends AbstractService { } if (isSession) { - this.dagSubmissionTimer = new Timer(true); + this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true); this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { checkAndHandleSessionTimeout(); } catch (TezException e) { - LOG.error("Error when check AM session timeout", e); + LOG.error("Error when checking AM session timeout", e); } } }, sessionTimeoutInterval, sessionTimeoutInterval / 10); } + + // Ignore client heartbeat timeout in local mode or non-session mode + if (!isLocal && isSession && clientAMHeartbeatTimeoutIntervalMillis > 0) { + // reset heartbeat time + clientHandler.updateLastHeartbeatTime(); + this.clientAMHeartBeatTimeoutService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ClientAMHeartBeatKeepAliveCheck #%d").build() + ); + this.clientAMHeartBeatTimeoutService.schedule(new Runnable() { + @Override + public void run() { + try { + long nextExpiry = checkAndHandleDAGClientTimeout(); + if (nextExpiry > 0) { + clientAMHeartBeatTimeoutService.schedule(this, nextExpiry, TimeUnit.MILLISECONDS); + } + } catch (TezException e) { + // Cannot be thrown unless the AM is being tried to shutdown so no need to + // reschedule the timer task + LOG.error("Error when checking Client AM heartbeat timeout", e); + } + } + }, clientAMHeartbeatTimeoutIntervalMillis, TimeUnit.MILLISECONDS); + } + } @@ -2137,6 +2173,9 @@ public class DAGAppMaster extends AbstractService { if (this.dagSubmissionTimer != null) { this.dagSubmissionTimer.cancel(); } + if (this.clientAMHeartBeatTimeoutService != null) { + this.clientAMHeartBeatTimeoutService.shutdownNow(); + } // release all the held containers before stop services TEZ-2687 initiateStop(); stopServices(); @@ -2273,6 +2312,33 @@ public class DAGAppMaster extends AbstractService { } } + private long checkAndHandleDAGClientTimeout() throws TezException { + if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state) + || sessionStopped.get()) { + // AM new or recovering so do not kill session at this time + // if session already completed or shutting down, this should be a a no-op + return -1; + } + + long currentTime = clock.getTime(); + long nextExpiry = clientHandler.getLastHeartbeatTime() + + clientAMHeartbeatTimeoutIntervalMillis; + if (currentTime < nextExpiry) { + // reschedule timer to 1 sec after the next expiry window + // to ensure that we time out as intended if there are no heartbeats + return ((nextExpiry+1000) - currentTime); + } + + String message = "Client-to-AM Heartbeat timeout interval expired, shutting down AM as client" + + " stopped heartbeating to it" + + ", lastClientAMHeartbeatTime=" + clientHandler.getLastHeartbeatTime() + + ", clientAMHeartbeatTimeoutIntervalMillis=" + + clientAMHeartbeatTimeoutIntervalMillis + " ms"; + addDiagnostic(message); + shutdownTezAM(message); + return -1; + } + private synchronized void checkAndHandleSessionTimeout() throws TezException { if (EnumSet.of(DAGAppMasterState.RUNNING, DAGAppMasterState.RECOVERING).contains(this.state) @@ -2287,6 +2353,7 @@ public class DAGAppMaster extends AbstractService { String message = "Session timed out" + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms" + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms"; + addDiagnostic(message); shutdownTezAM(message); } @@ -2379,7 +2446,7 @@ public class DAGAppMaster extends AbstractService { // log the system properties if (LOG.isInfoEnabled()) { - String systemPropsToLog = TezUtils.getSystemPropertiesToLog(conf); + String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf); if (systemPropsToLog != null) { LOG.info(systemPropsToLog); } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java index 8a8b776..bf07838 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -62,6 +63,7 @@ public class TestDAGClientHandler { AppContext mockAppContext = mock(AppContext.class); when(mockDagAM.getContext()).thenReturn(mockAppContext); when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG); + when(mockAppContext.getClock()).thenReturn(new SystemClock()); DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM); http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 2255ed7..022fea3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -54,7 +54,6 @@ import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezLocalResource; import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.security.JobTokenIdentifier; @@ -66,11 +65,9 @@ import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; -import org.apache.tez.hadoop.shim.HadoopShimProvider; import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; @@ -505,7 +502,7 @@ public class TezChild { // log the system properties if (LOG.isInfoEnabled()) { - String systemPropsToLog = TezUtils.getSystemPropertiesToLog(defaultConf); + String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf); if (systemPropsToLog != null) { LOG.info(systemPropsToLog); } http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index c3e8487..241c6e9 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -43,6 +43,8 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -1168,4 +1170,116 @@ public class TestTezJobs { assertEquals(0, expectedResult.size()); } + @Test(timeout = 60000) + public void testAMClientHeartbeatTimeout() throws Exception { + Path stagingDirPath = new Path("/tmp/timeout-staging-dir"); + remoteFs.mkdirs(stagingDirPath); + + YarnClient yarnClient = YarnClient.createYarnClient(); + + try { + + yarnClient.init(mrrTezCluster.getConfig()); + yarnClient.start(); + + List<ApplicationReport> apps = yarnClient.getApplications(); + int appsBeforeCount = apps != null ? apps.size() : 0; + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + tezConf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 5); + TezClient tezClient = TezClient.create("testAMClientHeartbeatTimeout", tezConf, true); + tezClient.start(); + tezClient.cancelAMKeepAlive(); + + ApplicationId appId = tezClient.getAppMasterApplicationId(); + + apps = yarnClient.getApplications(); + int appsAfterCount = apps != null ? apps.size() : 0; + + // Running in session mode. So should only create 1 more app. + Assert.assertEquals(appsBeforeCount + 1, appsAfterCount); + + ApplicationReport report; + while (true) { + report = yarnClient.getApplicationReport(appId); + if (report.getYarnApplicationState() == YarnApplicationState.FINISHED + || report.getYarnApplicationState() == YarnApplicationState.FAILED + || report.getYarnApplicationState() == YarnApplicationState.KILLED) { + break; + } + Thread.sleep(1000); + } + // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics + Thread.sleep(2000); + report = yarnClient.getApplicationReport(appId); + LOG.info("App Report for appId=" + appId + + ", report=" + report); + Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(), + report.getDiagnostics().contains("Client-to-AM Heartbeat timeout interval expired")); + + } finally { + remoteFs.delete(stagingDirPath, true); + if (yarnClient != null) { + yarnClient.stop(); + } + } + } + + @Test(timeout = 60000) + public void testSessionTimeout() throws Exception { + Path stagingDirPath = new Path("/tmp/sessiontimeout-staging-dir"); + remoteFs.mkdirs(stagingDirPath); + + YarnClient yarnClient = YarnClient.createYarnClient(); + + try { + + yarnClient.init(mrrTezCluster.getConfig()); + yarnClient.start(); + + List<ApplicationReport> apps = yarnClient.getApplications(); + int appsBeforeCount = apps != null ? apps.size() : 0; + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + tezConf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 5); + TezClient tezClient = TezClient.create("testSessionTimeout", tezConf, true); + tezClient.start(); + + ApplicationId appId = tezClient.getAppMasterApplicationId(); + + apps = yarnClient.getApplications(); + int appsAfterCount = apps != null ? apps.size() : 0; + + // Running in session mode. So should only create 1 more app. + Assert.assertEquals(appsBeforeCount + 1, appsAfterCount); + + ApplicationReport report; + while (true) { + report = yarnClient.getApplicationReport(appId); + if (report.getYarnApplicationState() == YarnApplicationState.FINISHED + || report.getYarnApplicationState() == YarnApplicationState.FAILED + || report.getYarnApplicationState() == YarnApplicationState.KILLED) { + break; + } + Thread.sleep(1000); + } + // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics + Thread.sleep(2000); + report = yarnClient.getApplicationReport(appId); + LOG.info("App Report for appId=" + appId + + ", report=" + report); + Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(), + report.getDiagnostics().contains("Session timed out")); + + } finally { + remoteFs.delete(stagingDirPath, true); + if (yarnClient != null) { + yarnClient.stop(); + } + } + } + + }
