YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no impact when it's turned off. Contributed by Sangjin Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ef71c1fc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ef71c1fc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ef71c1fc Branch: refs/heads/YARN-2928 Commit: ef71c1fc4c37b355e895cea23a868ba57555d9b5 Parents: e987c7e Author: Li Lu <[email protected]> Authored: Fri Dec 11 11:17:34 2015 -0800 Committer: Li Lu <[email protected]> Committed: Wed May 4 16:01:41 2016 -0700 ---------------------------------------------------------------------- .../jobhistory/JobHistoryEventHandler.java | 63 ++++---- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 11 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 - .../src/main/resources/mapred-default.xml | 7 - .../mapred/TestMRTimelineEventHandling.java | 5 +- .../hadoop/mapreduce/v2/MiniMRYarnCluster.java | 2 +- .../hadoop/yarn/conf/YarnConfiguration.java | 58 ++++++- .../distributedshell/ApplicationMaster.java | 153 ++++++++----------- .../applications/distributedshell/Client.java | 16 -- .../distributedshell/TestDistributedShell.java | 10 +- .../hadoop/yarn/client/api/TimelineClient.java | 18 ++- .../client/api/impl/TimelineClientImpl.java | 3 + .../src/main/resources/yarn-default.xml | 5 +- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 10 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 10 +- .../hadoop/yarn/server/nodemanager/Context.java | 3 +- .../yarn/server/nodemanager/NodeManager.java | 23 ++- .../nodemanager/NodeStatusUpdaterImpl.java | 48 +++--- .../collectormanager/NMCollectorService.java | 10 +- .../containermanager/ContainerManagerImpl.java | 62 +++++--- .../application/ApplicationImpl.java | 70 +++++++-- .../monitor/ContainersMonitorImpl.java | 11 +- .../timelineservice/NMTimelinePublisher.java | 49 +++--- .../TestContainerManagerRecovery.java | 9 +- .../application/TestApplication.java | 3 +- .../nodemanager/webapp/TestNMWebServices.java | 8 +- .../ApplicationMasterService.java | 11 +- .../server/resourcemanager/ClientRMService.java | 35 +++-- .../server/resourcemanager/RMAppManager.java | 7 +- .../server/resourcemanager/ResourceManager.java | 33 ++-- .../resourcemanager/ResourceTrackerService.java | 21 ++- .../resourcemanager/amlauncher/AMLauncher.java | 15 +- .../metrics/TimelineServiceV2Publisher.java | 2 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 16 +- .../resourcemanager/TestClientRMService.java | 3 + .../metrics/TestSystemMetricsPublisher.java | 2 +- .../TestSystemMetricsPublisherForV2.java | 1 + .../TestTimelineServiceClientIntegration.java | 30 +++- .../PerNodeTimelineCollectorsAuxService.java | 15 +- .../reader/TimelineReaderServer.java | 14 +- ...TestPerNodeTimelineCollectorsAuxService.java | 9 +- .../reader/TestTimelineReaderServer.java | 3 + .../reader/TestTimelineReaderWebServices.java | 2 + ...stTimelineReaderWebServicesHBaseStorage.java | 2 + 44 files changed, 520 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index b1c1a52..d88588c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -19,9 +19,6 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -31,7 +28,11 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -57,9 +58,9 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; -import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -81,11 +82,10 @@ import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The job history events get routed to this class. This class writes the Job * history events to the DFS directly into a staging dir and then moved to a @@ -133,20 +133,17 @@ public class JobHistoryEventHandler extends AbstractService protected static final Map<JobId, MetaInfo> fileMap = Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>()); - - // For posting entities in new timeline service in a non-blocking way - // TODO YARN-3367 replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; protected TimelineClient timelineClient; - private boolean newTimelineServiceEnabled = false; + private boolean timelineServiceV2Enabled = false; + + // For posting entities in new timeline service in a non-blocking way + // TODO YARN-3367 replace with event loop in TimelineClient. + private ExecutorService threadPool; private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; @@ -276,22 +273,26 @@ public class JobHistoryEventHandler extends AbstractService // configuration status: off, on_with_v1 or on_with_v2. if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - + LOG.info("Emitting job history data to the timeline service is enabled"); + if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineClient = ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); - newTimelineServiceEnabled = conf.getBoolean( - MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, - MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); - LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1")); - LOG.info("Emitting job history data to the timeline server is enabled"); + timelineServiceV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(conf); + LOG.info("Timeline service is enabled; version: " + + YarnConfiguration.getTimelineServiceVersion(conf)); + if (timelineServiceV2Enabled) { + // initialize the thread pool for v.2 timeline service + threadPool = createThreadPool(); + } } else { LOG.info("Timeline service is not enabled"); } } else { - LOG.info("Emitting job history data to the timeline server is not enabled"); + LOG.info("Emitting job history data to the timeline server is not " + + "enabled"); } // Flag for setting @@ -459,19 +460,27 @@ public class JobHistoryEventHandler extends AbstractService if (timelineClient != null) { timelineClient.stop(); } - shutdownAndAwaitTermination(); + if (threadPool != null) { + shutdownAndAwaitTermination(); + } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } // TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); @@ -633,7 +642,7 @@ public class JobHistoryEventHandler extends AbstractService processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); if (timelineClient != null) { - if (newTimelineServiceEnabled) { + if (timelineServiceV2Enabled) { processEventForNewTimelineService(historyEvent, event.getJobID(), event.getTimestamp()); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 3279d03..897e2aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1058,14 +1058,9 @@ public class MRAppMaster extends CompositeService { this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) - && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - - boolean newTimelineServiceEnabled = conf.getBoolean( - MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, - MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); - - if (newTimelineServiceEnabled) { + && YarnConfiguration.timelineServiceEnabled(conf)) { + + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { // create new version TimelineClient timelineClient = TimelineClient.createTimelineClient( appAttemptID.getApplicationId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index bc8aeda..c98746a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -467,11 +467,6 @@ public interface MRJobConfig { "mapreduce.job.emit-timeline-data"; public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA = false; - - public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = - "mapreduce.job.new-timeline-service.enabled"; - public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = - false; public static final String MR_PREFIX = "yarn.app.mapreduce."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index df3bd45..b7bdcc8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -628,13 +628,6 @@ </description> </property> - <property> - <name>mapreduce.job.new-timeline-service.enabled</name> - <value>false</value> - <description>Specifies if posting job and task events to new timeline service. - </description> -</property> - <property> <name>mapreduce.input.fileinputformat.split.minsize</name> <value>0</value> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index afc4686..1896c7b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -166,11 +166,10 @@ public class TestMRTimelineEventHandling { LOG.info("testMRNewTimelineServiceEventHandling start."); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // enable new timeline service + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); - // enable new timeline serivce in MR side - conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true); - // enable aux-service based timeline collectors conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 18a4c14..edb825d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -173,7 +173,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster { boolean enableTimelineAuxService = false; if (nmAuxServices != null) { for (String nmAuxService: nmAuxServices) { - if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) { + if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) { enableTimelineAuxService = true; break; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8892cd4..ed81eaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration { new DeprecationDelta("yarn.client.max-nodemanagers-proxies", NM_CLIENT_MAX_NM_PROXIES) }); + Configuration.addDeprecations(new DeprecationDelta[] { + new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + SYSTEM_METRICS_PUBLISHER_ENABLED) + }); } //Configurations @@ -456,7 +460,8 @@ public class YarnConfiguration extends Configuration { /** * The setting that controls whether yarn system metrics is published on the - * timeline server or not by RM. This configuration setting is for ATS V1 + * timeline server or not by RM. This configuration setting is for ATS V1. + * This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED. */ public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX + "system-metrics-publisher.enabled"; @@ -2675,13 +2680,52 @@ public class YarnConfiguration extends Configuration { } return clusterId; } - - public static boolean systemMetricsPublisherEnabled(Configuration conf) { + + // helper methods for timeline service configuration + /** + * Returns whether the timeline service is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service is enabled. + */ + public static boolean timelineServiceEnabled(Configuration conf) { return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + } + + /** + * Returns the timeline service version. It does not check whether the + * timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service version as a float. + */ + public static float getTimelineServiceVersion(Configuration conf) { + return conf.getFloat(TIMELINE_SERVICE_VERSION, + DEFAULT_TIMELINE_SERVICE_VERSION); + } + + /** + * Returns whether the timeline service v.2 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.2 is enabled. V.2 refers to a + * version greater than equal to 2 but smaller than 3. + */ + public static boolean timelineServiceV2Enabled(Configuration conf) { + return timelineServiceEnabled(conf) && + (int)getTimelineServiceVersion(conf) == 2; + } + + /** + * Returns whether the system publisher is enabled. + * + * @param conf the configuration + * @return whether the system publisher is enabled. + */ + public static boolean systemMetricsPublisherEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); } /* For debugging. mp configurations to system output as XML format. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 1a9a4ca..6bd3c26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -223,14 +223,11 @@ public class ApplicationMaster { // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - private boolean newTimelineService = false; + private boolean timelineServiceV2 = false; // For posting entities in new timeline service in a non-blocking way // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); + private ExecutorService threadPool; // App Master configuration // No. of containers to run shell command on @@ -331,8 +328,10 @@ public class ApplicationMaster { } appMaster.run(); result = appMaster.finish(); - - shutdownAndAwaitTermination(); + + if (appMaster.threadPool != null) { + appMaster.shutdownAndAwaitTermination(); + } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -346,16 +345,22 @@ public class ApplicationMaster { System.exit(2); } } - + //TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { + private ExecutorService createThreadPool() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + } + + private void shutdownAndAwaitTermination() { threadPool.shutdown(); try { // Wait a while for existing tasks to terminate if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); + LOG.error("ThreadPool did not terminate"); } } catch (InterruptedException ie) { threadPool.shutdownNow(); @@ -433,8 +438,7 @@ public class ApplicationMaster { opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); - opts.addOption("timeline_service_version", true, - "Version for timeline service"); + opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -586,27 +590,15 @@ public class ApplicationMaster { containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - if (cliParser.hasOption("timeline_service_version")) { - String timelineServiceVersion = - cliParser.getOptionValue("timeline_service_version", "v1"); - if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { - newTimelineService = false; - } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) { - newTimelineService = true; - } else { - throw new IllegalArgumentException( - "timeline_service_version is not set properly, should be 'v1' or 'v2'"); - } + if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineServiceV2 = + YarnConfiguration.timelineServiceV2Enabled(conf); + if (timelineServiceV2) { + threadPool = createThreadPool(); } } else { timelineClient = null; LOG.warn("Timeline service is not enabled"); - if (cliParser.hasOption("timeline_service_version")) { - throw new IllegalArgumentException( - "Timeline service is not enabled"); - } } return true; @@ -668,16 +660,17 @@ public class ApplicationMaster { nmClientAsync.start(); startTimelineClient(conf); - // need to bind timelineClient - amRMClient.registerTimelineClient(timelineClient); + if (timelineServiceV2) { + // need to bind timelineClient + amRMClient.registerTimelineClient(timelineClient); + } if(timelineClient != null) { - if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, - appSubmitterUgi); + if (timelineServiceV2) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_START); } else { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } } @@ -748,10 +741,9 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + if (YarnConfiguration.timelineServiceEnabled(conf)) { // Creating the Timeline Client - if (newTimelineService) { + if (timelineServiceV2) { timelineClient = TimelineClient.createTimelineClient( appAttemptID.getApplicationId()); } else { @@ -787,10 +779,9 @@ public class ApplicationMaster { } if (timelineClient != null) { - if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, - appSubmitterUgi); + if (timelineServiceV2) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_END); } else { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); @@ -900,12 +891,11 @@ public class ApplicationMaster { + containerStatus.getContainerId()); } if(timelineClient != null) { - if (newTimelineService) { - publishContainerEndEventOnNewTimelineService( - timelineClient, containerStatus, domainId, appSubmitterUgi); + if (timelineServiceV2) { + publishContainerEndEventOnTimelineServiceV2(containerStatus); } else { publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); + timelineClient, containerStatus, domainId, appSubmitterUgi); } } } @@ -1032,14 +1022,13 @@ public class ApplicationMaster { applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); - if (applicationMaster.newTimelineService) { - ApplicationMaster.publishContainerStartEventOnNewTimelineService( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); + if (applicationMaster.timelineServiceV2) { + applicationMaster.publishContainerStartEventOnTimelineServiceV2( + container); } else { applicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } } } @@ -1349,7 +1338,7 @@ public class ApplicationMaster { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), e); + + appAttemptID, e); } } @@ -1397,27 +1386,24 @@ public class ApplicationMaster { return new Thread(runnableLaunchContainer); } - private static void publishContainerStartEventOnNewTimelineService( - final TimelineClient timelineClient, final Container container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerStartEventOnTimelineServiceV2( + final Container container) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerStartEventOnNewTimelineServiceBase(timelineClient, - container, domainId, ugi); + publishContainerStartEventOnTimelineServiceV2Base(container); } }; threadPool.execute(publishWrapper); } - private static void publishContainerStartEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, Container container, String domainId, - UserGroupInformation ugi) { + private void publishContainerStartEventOnTimelineServiceV2Base( + Container container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); @@ -1428,7 +1414,7 @@ public class ApplicationMaster { entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1442,27 +1428,24 @@ public class ApplicationMaster { } } - private static void publishContainerEndEventOnNewTimelineService( - final TimelineClient timelineClient, final ContainerStatus container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerEndEventOnTimelineServiceV2( + final ContainerStatus container) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerEndEventOnNewTimelineServiceBase(timelineClient, - container, domainId, ugi); + publishContainerEndEventOnTimelineServiceV2Base(container); } }; threadPool.execute(publishWrapper); } - private static void publishContainerEndEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, final ContainerStatus container, - final String domainId, final UserGroupInformation ugi) { + private void publishContainerEndEventOnTimelineServiceV2Base( + final ContainerStatus container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); @@ -1472,7 +1455,7 @@ public class ApplicationMaster { entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1486,29 +1469,25 @@ public class ApplicationMaster { } } - private static void publishApplicationAttemptEventOnNewTimelineService( - final TimelineClient timelineClient, final String appAttemptId, - final DSEvent appEvent, final String domainId, - final UserGroupInformation ugi) { + private void publishApplicationAttemptEventOnTimelineServiceV2( + final DSEvent appEvent) { Runnable publishWrapper = new Runnable() { public void run() { - publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, - appAttemptId, appEvent, domainId, ugi); + publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent); } }; threadPool.execute(publishWrapper); } - private static void publishApplicationAttemptEventOnNewTimelineServiceBase( - final TimelineClient timelineClient, String appAttemptId, - DSEvent appEvent, String domainId, UserGroupInformation ugi) { + private void publishApplicationAttemptEventOnTimelineServiceV2Base( + DSEvent appEvent) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); - entity.setId(appAttemptId); + entity.setId(appAttemptID.toString()); entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); //entity.setDomainId(domainId); - entity.addInfo("user", ugi.getShortUserName()); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setId(appEvent.toString()); @@ -1516,7 +1495,7 @@ public class ApplicationMaster { entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction<Object>() { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { timelineClient.putEntities(entity); @@ -1527,7 +1506,7 @@ public class ApplicationMaster { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), + + appAttemptID, e instanceof UndeclaredThrowableException ? e.getCause() : e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 43ffa09..4ae7edd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -195,8 +195,6 @@ public class Client { // Command line options private Options opts; - private String timelineServiceVersion; - private static final String shellCommandPath = "shellCommands"; private static final String shellArgsPath = "shellArgs"; private static final String appMasterJarPath = "AppMaster.jar"; @@ -272,7 +270,6 @@ public class Client { opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); - opts.addOption("timeline_service_version", true, "Version for timeline service"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + " If the flag is true, running containers will not be killed when" + @@ -386,16 +383,6 @@ public class Client { + " Specified virtual cores=" + amVCores); } - if (cliParser.hasOption("timeline_service_version")) { - timelineServiceVersion = - cliParser.getOptionValue("timeline_service_version", "v1"); - if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || - timelineServiceVersion.trim().equalsIgnoreCase("v2"))) { - throw new IllegalArgumentException( - "timeline_service_version is not set properly, should be 'v1' or 'v2'"); - } - } - if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); } @@ -725,9 +712,6 @@ public class Client { vargs.addAll(containerRetryOptions); - if (timelineServiceVersion != null) { - vargs.add("--timeline_service_version " + timelineServiceVersion); - } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 80ffd15..bdf6e2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -138,9 +138,10 @@ public class TestDistributedShell { conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); // mark if we need to launch the v1 timeline server - boolean enableATSServer = true; // disable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); @@ -157,7 +158,6 @@ public class TestDistributedShell { true); conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); - conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false); // ATS version specific settings if (timelineVersion == 1.0f) { @@ -177,7 +177,6 @@ public class TestDistributedShell { DistributedShellTimelinePlugin.class.getName()); } else if (timelineVersion == 2.0f) { // disable v1 timeline server since we no longer have a server here - enableATSServer = false; // enable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME @@ -331,12 +330,7 @@ public class TestDistributedShell { } boolean isTestingTimelineV2 = false; if (timelineVersionWatcher.getTimelineVersion() == 2.0f) { - String[] timelineArgs = { - "--timeline_service_version", - "v2" - }; isTestingTimelineV2 = true; - args = mergeArgs(args, timelineArgs); if (!defaultFlow) { String[] flowArgs = { "--flow_name", http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index d629bb7..e0f2267 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -54,17 +54,21 @@ public abstract class TimelineClient extends AbstractService implements * current user may use {@link UserGroupInformation#doAs} another user to * construct and initialize a timeline client if the following operations are * supposed to be conducted by that user. - * - * @return a timeline client */ protected ApplicationId contextAppId; + /** + * Creates an instance of the timeline v.1.x client. + */ @Public public static TimelineClient createTimelineClient() { TimelineClient client = new TimelineClientImpl(); return client; } + /** + * Creates an instance of the timeline v.2 client. + */ @Public public static TimelineClient createTimelineClient(ApplicationId appId) { TimelineClient client = new TimelineClientImpl(appId); @@ -203,8 +207,9 @@ public abstract class TimelineClient extends AbstractService implements /** * <p> * Send the information of a number of conceptual entities to the timeline - * aggregator. It is a blocking API. The method will not return until all the - * put entities have been persisted. + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. If this method is invoked + * for a non-v.2 timeline client instance, a YarnException is thrown. * </p> * * @param entities @@ -220,8 +225,9 @@ public abstract class TimelineClient extends AbstractService implements /** * <p> * Send the information of a number of conceptual entities to the timeline - * aggregator. It is an asynchronous API. The method will return once all the - * entities are received. + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. If this method is invoked for a + * non-v.2 timeline client instance, a YarnException is thrown. * </p> * * @param entities http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 0096879..f92e6a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -388,6 +388,9 @@ public class TimelineClientImpl extends TimelineClient { private void putEntities(boolean async, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { + if (!timelineServiceV2) { + throw new YarnException("v.2 method is invoked on a v.1.x client"); + } org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entitiesContainer = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cc81402..e4b562e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -785,14 +785,15 @@ <property> <description>The setting that controls whether yarn system metrics is published to the Timeline server (version one) or not, by RM. - This configuration is deprecated.</description> + This configuration is now deprecated in favor of + yarn.system-metrics-publisher.enabled.</description> <name>yarn.resourcemanager.system-metrics-publisher.enabled</name> <value>false</value> </property> <property> <description>The setting that controls whether yarn system metrics is - published on the Timeline server (version two) or not by RM And NM.</description> + published on the Timeline service or not by RM And NM.</description> <name>yarn.system-metrics-publisher.enabled</name> <value>false</value> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index fa0cf5c..066abfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -261,10 +261,12 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList(); - this.registeredCollectors = new HashMap<ApplicationId, String> (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.registeredCollectors = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 2521b9c..151006b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -531,10 +531,12 @@ public class NodeHeartbeatResponsePBImpl extends private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List<AppCollectorsMapProto> list = p.getAppCollectorsMapList(); - this.appCollectorsMap = new HashMap<ApplicationId, String> (); - for (AppCollectorsMapProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + if (!list.isEmpty()) { + this.appCollectorsMap = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 4c2245e..9305d25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -75,7 +75,8 @@ public interface Context { /** * Get the registered collectors that located on this NM. - * @return registered + * @return registered collectors, or null if the timeline service v.2 is not + * enabled */ Map<ApplicationId, String> getRegisteredCollectors(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 50e122c..b41a81a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -100,6 +100,7 @@ public class NodeManager extends CompositeService private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + // the NM collector service is set only if the timeline service v.2 is enabled private NMCollectorService nmCollectorService; private NodeStatusUpdater nodeStatusUpdater; private NodeResourceMonitor nodeResourceMonitor; @@ -375,8 +376,10 @@ public class NodeManager extends CompositeService DefaultMetricsSystem.initialize("NodeManager"); - this.nmCollectorService = createNMCollectorService(context); - addService(nmCollectorService); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.nmCollectorService = createNMCollectorService(context); + addService(nmCollectorService); + } // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. @@ -473,8 +476,7 @@ public class NodeManager extends CompositeService protected final ConcurrentMap<ContainerId, Container> containers = new ConcurrentSkipListMap<ContainerId, Container>(); - protected Map<ApplicationId, String> registeredCollectors = - new ConcurrentHashMap<ApplicationId, String>(); + protected Map<ApplicationId, String> registeredCollectors; protected final ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> increasedContainers = @@ -507,6 +509,9 @@ public class NodeManager extends CompositeService LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, NMStateStoreService stateStore, boolean isDistSchedulingEnabled, Configuration conf) { + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.registeredCollectors = new ConcurrentHashMap<>(); + } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -783,7 +788,14 @@ public class NodeManager extends CompositeService return this.context; } - // For testing + /** + * Returns the NM collector service. It should be used only for testing + * purposes. + * + * @return the NM collector service, or null if the timeline service v.2 is + * not enabled + */ + @VisibleForTesting NMCollectorService getNMCollectorService() { return this.nmCollectorService; } @@ -791,6 +803,7 @@ public class NodeManager extends CompositeService public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + @SuppressWarnings("resource") NodeManager nodeManager = new NodeManager(); Configuration conf = new YarnConfiguration(); new GenericOptionsParser(conf, args); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 08334ae..34dfdd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -840,7 +840,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements dispatcher.getEventHandler().handle( new CMgrSignalContainersEvent(containersToSignal)); } - if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { + if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { updateTimelineClientsAddress(response); } @@ -874,7 +874,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements /** * Caller should take care of sending non null nodelabels for both * arguments - * + * * @param nodeLabelsNew * @param nodeLabelsOld * @return if the New node labels are diff from the older one. @@ -890,27 +890,37 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private void updateTimelineClientsAddress( NodeHeartbeatResponse response) { - Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = - response.getAppCollectorsMap().entrySet(); - for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); + Map<ApplicationId, String> knownCollectorsMap = + response.getAppCollectorsMap(); + if (knownCollectorsMap == null) { + LOG.warn("the collectors map is null"); + } else { + Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = + knownCollectorsMap.entrySet(); + for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) { + ApplicationId appId = entry.getKey(); + String collectorAddr = entry.getValue(); + + // Only handle applications running on local node. + // Not include apps with timeline collectors running in local + Application application = context.getApplications().get(appId); + // TODO this logic could be problematic if the collector address + // gets updated due to NM restart or collector service failure + if (application != null && + !context.getRegisteredCollectors().containsKey(appId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + collectorAddr + + " for application: " + appId + " from RM."); + } + TimelineClient client = application.getTimelineClient(); + if (client != null) { + client.setTimelineServiceAddress(collectorAddr); + } } - TimelineClient client = application.getTimelineClient(); - client.setTimelineServiceAddress(collectorAddr); } } } - + private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index db79ee5..3ba81ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +/** + * Service that handles collector information. It is used only if the timeline + * service v.2 is enabled. + */ public class NMCollectorService extends CompositeService implements CollectorNodemanagerProtocol { @@ -113,9 +117,9 @@ public class NMCollectorService extends CompositeService implements String collectorAddr = collector.getCollectorAddr(); newCollectorsMap.put(appId, collectorAddr); // set registered collector address to TimelineClient. - if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { - TimelineClient client = - context.getApplications().get(appId).getTimelineClient(); + TimelineClient client = + context.getApplications().get(appId).getTimelineClient(); + if (client != null) { client.setTimelineServiceAddress(collectorAddr); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 33aeac4..46571f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; @@ -190,7 +191,8 @@ public class ContainerManagerImpl extends CompositeService implements private long waitForContainersOnShutdownMillis; - private final NMTimelinePublisher nmMetricsPublisher; + // NM metrics publisher is set only if the timeline service v.2 is enabled + private NMTimelinePublisher nmMetricsPublisher; public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -218,9 +220,17 @@ public class ContainerManagerImpl extends CompositeService implements auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); - nmMetricsPublisher = createNMTimelinePublisher(context); - context.setNMTimelinePublisher(nmMetricsPublisher); - this.containersMonitor = createContainersMonitor(exec); + // initialize the metrics publisher if the timeline service v.2 is enabled + // and the system publisher is enabled + Configuration conf = context.getConf(); + if (YarnConfiguration.timelineServiceV2Enabled(conf) && + YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + LOG.info("YARN system metrics publishing service is enabled"); + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); + } + this.containersMonitor = + new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); dispatcher.register(ContainerEventType.class, @@ -236,7 +246,6 @@ public class ContainerManagerImpl extends CompositeService implements addService(dispatcher); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -341,7 +350,7 @@ public class ContainerManagerImpl extends CompositeService implements LOG.info("Recovering application " + appId); //TODO: Recover flow and flow run ID ApplicationImpl app = new ApplicationImpl( - dispatcher, p.getUser(), null, null, 0, appId, creds, context); + dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -959,20 +968,27 @@ public class ContainerManagerImpl extends CompositeService implements try { if (!isServiceStopped()) { // Create the application - String flowName = launchContext.getEnvironment().get( - TimelineUtils.FLOW_NAME_TAG_PREFIX); - String flowVersion = launchContext.getEnvironment().get( - TimelineUtils.FLOW_VERSION_TAG_PREFIX); - String flowRunIdStr = launchContext.getEnvironment().get( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - long flowRunId = 0L; - if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.parseLong(flowRunIdStr); + // populate the flow context from the launch context if the timeline + // service v.2 is enabled + FlowContext flowContext = null; + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment().get( + TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment().get( + TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = + new FlowContext(flowName, flowVersion, flowRunId); } if (!context.getApplications().containsKey(applicationID)) { Application application = - new ApplicationImpl(dispatcher, user, flowName, flowVersion, - flowRunId, applicationID, credentials, context); + new ApplicationImpl(dispatcher, user, flowContext, + applicationID, credentials, context); if (context.getApplications().putIfAbsent(applicationID, application) == null) { LOG.info("Creating a new application reference for app " @@ -1324,7 +1340,9 @@ public class ContainerManagerImpl extends CompositeService implements Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); - nmMetricsPublisher.publishContainerEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishContainerEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1340,7 +1358,9 @@ public class ContainerManagerImpl extends CompositeService implements event.getApplicationID()); if (app != null) { app.handle(event); - nmMetricsPublisher.publishApplicationEvent(event); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.publishApplicationEvent(event); + } } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1363,7 +1383,9 @@ public class ContainerManagerImpl extends CompositeService implements @Override public void handle(LocalizationEvent event) { origLocalizationEventHandler.handle(event); - timelinePublisher.publishLocalizationEvent(event); + if (timelinePublisher != null) { + timelinePublisher.publishLocalizationEvent(event); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 6e87cfd..93c6758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -67,9 +67,8 @@ public class ApplicationImpl implements Application { final Dispatcher dispatcher; final String user; - final String flowName; - final String flowVersion; - final long flowRunId; + // flow context is set only if the timeline service v.2 is enabled + private FlowContext flowContext; final ApplicationId appId; final Credentials credentials; Map<ApplicationAccessType, String> applicationACLs; @@ -86,14 +85,16 @@ public class ApplicationImpl implements Application { Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>(); - public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, - String flowVersion, long flowRunId, ApplicationId appId, - Credentials credentials, Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context) { + this(dispatcher, user, null, appId, credentials, context); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + FlowContext flowContext, ApplicationId appId, Credentials credentials, + Context context) { this.dispatcher = dispatcher; this.user = user; - this.flowName = flowName; - this.flowVersion = flowVersion; - this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); @@ -103,11 +104,44 @@ public class ApplicationImpl implements Application { writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); Configuration conf = context.getConf(); - if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - createAndStartTimelineClient(conf); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + if (flowContext == null) { + throw new IllegalArgumentException("flow context cannot be null"); + } + this.flowContext = flowContext; + if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + createAndStartTimelineClient(conf); + } } } - + + /** + * Data object that encapsulates the flow context for the application purpose. + */ + public static class FlowContext { + private final String flowName; + private final String flowVersion; + private final long flowRunId; + + public FlowContext(String flowName, String flowVersion, long flowRunId) { + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; + } + + public long getFlowRunId() { + return flowRunId; + } + } + private void createAndStartTimelineClient(Configuration conf) { // create and start timeline client this.timelineClient = TimelineClient.createTimelineClient(appId); @@ -454,7 +488,11 @@ public class ApplicationImpl implements Application { // Remove collectors info for finished apps. // TODO check we remove related collectors info in failure cases // (YARN-3038) - app.context.getRegisteredCollectors().remove(app.getAppId()); + Map<ApplicationId, String> registeredCollectors = + app.context.getRegisteredCollectors(); + if (registeredCollectors != null) { + registeredCollectors.remove(app.getAppId()); + } // stop timelineClient when application get finished. TimelineClient timelineClient = app.getTimelineClient(); if (timelineClient != null) { @@ -521,16 +559,16 @@ public class ApplicationImpl implements Application { @Override public String getFlowName() { - return flowName; + return flowContext == null ? null : flowContext.getFlowName(); } @Override public String getFlowVersion() { - return flowVersion; + return flowContext == null ? null : flowContext.getFlowVersion(); } @Override public long getFlowRunId() { - return flowRunId; + return flowContext == null ? 0L : flowContext.getFlowRunId(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 40c5996..739d363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -564,9 +565,13 @@ public class ContainersMonitorImpl extends AbstractService implements ContainerImpl container = (ContainerImpl) context.getContainers().get(containerId); - container.getNMTimelinePublisher().reportContainerResourceUsage( - container, currentTime, pId, currentPmemUsage, - cpuUsageTotalCoresPercentage); + NMTimelinePublisher nmMetricsPublisher = + container.getNMTimelinePublisher(); + if (nmMetricsPublisher != null) { + nmMetricsPublisher.reportContainerResourceUsage( + container, currentTime, pId, currentPmemUsage, + cpuUsageTotalCoresPercentage); + } } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 2c5c300..69de433 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -56,12 +55,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +/** + * Metrics publisher service that publishes data to the timeline service v.2. It + * is used only if the timeline service v.2 is enabled and the system publishing + * of events and metrics is enabled. + */ public class NMTimelinePublisher extends CompositeService { private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); private Dispatcher dispatcher; - private boolean publishSystemMetrics; private Context context; @@ -76,24 +79,16 @@ public class NMTimelinePublisher extends CompositeService { @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetrics = - YarnConfiguration.systemMetricsPublisherEnabled(conf); - - if (publishSystemMetrics) { - dispatcher = new AsyncDispatcher(); - dispatcher.register(NMTimelineEventType.class, - new ForwardingEventHandler()); - dispatcher - .register(ContainerEventType.class, new ContainerEventHandler()); - dispatcher.register(ApplicationEventType.class, - new ApplicationEventHandler()); - dispatcher.register(LocalizationEventType.class, - new LocalizationEventDispatcher()); - addIfService(dispatcher); - LOG.info("YARN system metrics publishing service is enabled"); - } else { - LOG.info("YARN system metrics publishing service is not enabled"); - } + dispatcher = new AsyncDispatcher(); + dispatcher.register(NMTimelineEventType.class, + new ForwardingEventHandler()); + dispatcher + .register(ContainerEventType.class, new ContainerEventHandler()); + dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventDispatcher()); + addIfService(dispatcher); super.serviceInit(conf); } @@ -121,8 +116,9 @@ public class NMTimelinePublisher extends CompositeService { public void reportContainerResourceUsage(Container container, long createdTime, String pId, Long pmemUsage, Float cpuUsageTotalCoresPercentage) { - if (publishSystemMetrics - && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) { + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || + cpuUsageTotalCoresPercentage != + ResourceCalculatorProcessTree.UNAVAILABLE) { ContainerEntity entity = createContainerEntity(container.getContainerId()); long currentTimeMillis = System.currentTimeMillis(); @@ -219,9 +215,6 @@ public class NMTimelinePublisher extends CompositeService { } public void publishApplicationEvent(ApplicationEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case INIT_APPLICATION: @@ -242,9 +235,6 @@ public class NMTimelinePublisher extends CompositeService { } public void publishContainerEvent(ContainerEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case INIT_CONTAINER: @@ -262,9 +252,6 @@ public class NMTimelinePublisher extends CompositeService { } public void publishLocalizationEvent(LocalizationEvent event) { - if (!publishSystemMetrics) { - return; - } // publish only when the desired event is received switch (event.getType()) { case CONTAINER_RESOURCES_LOCALIZED: http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef71c1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 13956db..9b3cff0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -50,7 +50,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -95,7 +94,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -106,7 +104,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; public class TestContainerManagerRecovery extends BaseContainerManagerTest { @@ -642,9 +639,9 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { } @Override - public NMTimelinePublisher createNMTimelinePublisher(Context context) { - NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class); - return timelinePublisher; + public NMTimelinePublisher + createNMTimelinePublisher(Context context) { + return null; } }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
