TEZ-1827. MiniTezCluster takes 10 minutes to shut down. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63e985df Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63e985df Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63e985df Branch: refs/heads/TEZ-2003 Commit: 63e985dfcf8bda29c3a988e97256d61a6bb9478c Parents: 19df277 Author: Hitesh Shah <[email protected]> Authored: Thu Mar 12 07:51:08 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Mar 12 07:51:08 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../main/java/org/apache/tez/dag/api/Scope.java | 2 + .../apache/tez/dag/api/TezConfiguration.java | 10 +++ .../tez/tests/MiniTezClusterWithTimeline.java | 74 ++++++++++++++++--- .../org/apache/tez/test/MiniTezCluster.java | 78 +++++++++++++++++--- 5 files changed, 146 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d29cb5d..207502b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1827. MiniTezCluster takes 10 minutes to shut down. TEZ-2178. YARN-3122 breaks tez compilation with hadoop 2.7.0. TEZ-2174. Make task priority available to TaskAttemptListener. TEZ-2169. Add NDC context to various threads and pools. http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java index d862e8f..f638e09 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java @@ -25,4 +25,6 @@ public enum Scope { AM, // can only been set at AM level DAG, // can been set at AM/DAG level VERTEX, // can been set at AM/DAG/VERTEX level + CLIENT, // Client scope - only applicable on client + TEST, // Test scope - only applicable for testing, for example, MiniTezCluster } http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8186f2a..c97999f 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1164,4 +1164,14 @@ public class TezConfiguration extends Configuration { public static final String TEZ_DAG_STATUS_POLLINTERVAL_MS = TEZ_PREFIX + "dag.status.pollinterval-ms"; public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500; + + /** + * Long value. + * Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown. + */ + @ConfigurationScope(Scope.TEST) + public static final String TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS = + TEZ_PREFIX + "test.minicluster.app.wait.on.shutdown.secs"; + public static final long TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT = 30; + } http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java index d48948b..b6e39fa 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java @@ -23,7 +23,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +41,7 @@ import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -57,7 +61,6 @@ import com.google.common.collect.Collections2; /** * Configures and starts the Tez-specific components in the YARN cluster. * - * When using this mini cluster, the user is expected to */ public class MiniTezClusterWithTimeline extends MiniYARNCluster { @@ -69,6 +72,8 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster { private Path confFilePath; + private long maxTimeToWaitForAppsOnShutdown; + public MiniTezClusterWithTimeline(String testName) { this(testName, 1); } @@ -103,7 +108,11 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster { // nothing defined. set quick delete value conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); } - + + maxTimeToWaitForAppsOnShutdown = conf.getLong( + TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS, + TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT); + File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR); if (!appJarLocalFile.exists()) { @@ -219,25 +228,72 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster { } private void waitForAppsToFinish() { - YarnClient yarnClient = YarnClient.createYarnClient(); + long waitStartTime = System.currentTimeMillis(); + long waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ? + -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown)); + + YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(getConfig()); yarnClient.start(); + Collection<ApplicationReport> unCompletedApps = null; try { - while(true) { + do { List<ApplicationReport> appReports = yarnClient.getApplications(); - Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){ + unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>() { @Override public boolean apply(ApplicationReport appReport) { return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, - YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) - .contains(appReport.getYarnApplicationState()); + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + .contains(appReport.getYarnApplicationState()); } }); - if (unCompletedApps.size()==0){ + if (unCompletedApps.isEmpty()) { break; } - LOG.info("wait for applications to finish in MiniTezClusterWithTimeline"); + LOG.info("Waiting for applications to finish in MiniTezClusterWithTimeline" + + ", incompleteAppsCount=" + unCompletedApps.size()); Thread.sleep(1000); + } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis()); + + if (unCompletedApps != null && !unCompletedApps.isEmpty()) { + LOG.info("Killing incomplete applications in MiniTezCluster" + + ", incompleteAppsCount=" + unCompletedApps.size()); + Set<ApplicationId> incompleteAppIds = + new HashSet<ApplicationId>(); + for (ApplicationReport appReport : unCompletedApps) { + try { + LOG.info("Killing application, id=" + appReport.getApplicationId() + + ", appName=" + appReport.getName()); + yarnClient.killApplication(appReport.getApplicationId()); + incompleteAppIds.add(appReport.getApplicationId()); + } catch (Exception e) { + LOG.warn("Failed to kill app on MiniTezCluster shutdown" + + ", appId=" + appReport.getApplicationId() + + ", appName=" + appReport.getName()); + } + } + + // Wait for RM to report back that incomplete apps are killed + waitStartTime = System.currentTimeMillis(); + waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ? + -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown)); + do { + Iterator<ApplicationId> iter = incompleteAppIds.iterator(); + while (iter.hasNext()) { + ApplicationId applicationId = iter.next(); + ApplicationReport report = yarnClient.getApplicationReport(applicationId); + if (EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, + YarnApplicationState.KILLED).contains(report.getYarnApplicationState())) { + iter.remove(); + LOG.info("Application completed, id=" + report.getApplicationId() + + ", yarnState=" + report.getYarnApplicationState()); + } + } + if (incompleteAppIds.isEmpty()) { + break; + } + } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis()); + } } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index 39101eb..1f747b9 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -23,7 +23,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +41,7 @@ import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -68,6 +72,8 @@ public class MiniTezCluster extends MiniYARNCluster { private Path confFilePath; + private long maxTimeToWaitForAppsOnShutdown; + public MiniTezCluster(String testName) { this(testName, 1); } @@ -92,12 +98,16 @@ public class MiniTezCluster extends MiniYARNCluster { conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath()); } - + if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) { // nothing defined. set quick delete value conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); } - + + maxTimeToWaitForAppsOnShutdown = conf.getLong( + TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS, + TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT); + File appJarLocalFile = new File(MiniTezCluster.APPJAR); if (!appJarLocalFile.exists()) { @@ -108,7 +118,7 @@ public class MiniTezCluster extends MiniYARNCluster { } else { LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath()); } - + FileSystem fs = FileSystem.get(conf); Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir")); Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar"); @@ -213,25 +223,73 @@ public class MiniTezCluster extends MiniYARNCluster { } private void waitForAppsToFinish() { - YarnClient yarnClient = YarnClient.createYarnClient(); + long waitStartTime = System.currentTimeMillis(); + long waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ? + -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown)); + + YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(getConfig()); yarnClient.start(); + Collection<ApplicationReport> unCompletedApps = null; try { - while(true) { + do { List<ApplicationReport> appReports = yarnClient.getApplications(); - Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){ + unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>() { @Override public boolean apply(ApplicationReport appReport) { return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, - YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) - .contains(appReport.getYarnApplicationState()); + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + .contains(appReport.getYarnApplicationState()); } }); - if (unCompletedApps.size()==0){ + if (unCompletedApps.isEmpty()) { break; } - LOG.info("wait for applications to finish in MiniTezCluster"); + LOG.info("Waiting for applications to finish in MiniTezCluster" + + ", incompleteAppsCount=" + unCompletedApps.size()); Thread.sleep(1000); + } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis()); + + + if (unCompletedApps != null && !unCompletedApps.isEmpty()) { + LOG.info("Killing incomplete applications in MiniTezCluster" + + ", incompleteAppsCount=" + unCompletedApps.size()); + Set<ApplicationId> incompleteAppIds = + new HashSet<ApplicationId>(); + for (ApplicationReport appReport : unCompletedApps) { + try { + LOG.info("Killing application, id=" + appReport.getApplicationId() + + ", appName=" + appReport.getName()); + yarnClient.killApplication(appReport.getApplicationId()); + incompleteAppIds.add(appReport.getApplicationId()); + } catch (Exception e) { + LOG.warn("Failed to kill app on MiniTezCluster shutdown" + + ", appId=" + appReport.getApplicationId() + + ", appName=" + appReport.getName()); + } + } + + // Wait for RM to report back that incomplete apps are killed + waitStartTime = System.currentTimeMillis(); + waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ? + -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown)); + do { + Iterator<ApplicationId> iter = incompleteAppIds.iterator(); + while (iter.hasNext()) { + ApplicationId applicationId = iter.next(); + ApplicationReport report = yarnClient.getApplicationReport(applicationId); + if (EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, + YarnApplicationState.KILLED).contains(report.getYarnApplicationState())) { + iter.remove(); + LOG.info("Application completed, id=" + report.getApplicationId() + + ", yarnState=" + report.getYarnApplicationState()); + } + } + if (incompleteAppIds.isEmpty()) { + break; + } + } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis()); + } } catch (Exception e) { e.printStackTrace();
