Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 5eeb2b156 -> 582211888


YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58221188
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58221188
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58221188

Branch: refs/heads/YARN-2928
Commit: 58221188811e0f61d842dac89e1f4ad4fd8aa182
Parents: 5eeb2b1
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Apr 24 16:56:23 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Fri Apr 24 16:56:23 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../resourcemanager/RMActiveServiceContext.java |  13 +-
 .../server/resourcemanager/RMAppManager.java    |   3 +-
 .../yarn/server/resourcemanager/RMContext.java  |   7 +-
 .../server/resourcemanager/RMContextImpl.java   |  12 +-
 .../server/resourcemanager/ResourceManager.java |  14 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  15 ++
 .../timelineservice/RMTimelineCollector.java    | 111 --------
 .../RMTimelineCollectorManager.java             |  75 ++++++
 .../TestTimelineServiceClientIntegration.java   |  12 +-
 .../collector/AppLevelTimelineCollector.java    |   2 +-
 .../collector/NodeTimelineCollectorManager.java | 223 ++++++++++++++++
 .../PerNodeTimelineCollectorsAuxService.java    |  15 +-
 .../collector/TimelineCollector.java            |   2 +-
 .../collector/TimelineCollectorManager.java     | 259 +++----------------
 .../collector/TimelineCollectorWebService.java  |  23 +-
 .../TestNMTimelineCollectorManager.java         | 160 ++++++++++++
 ...TestPerNodeTimelineCollectorsAuxService.java |  24 +-
 .../collector/TestTimelineCollectorManager.java | 160 ------------
 19 files changed, 578 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a3ca475..408b8e6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -53,6 +53,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3391. Clearly define flow ID/ flow run / flow version in API and 
storage.
     (Zhijie Shen via junping_du)
 
+    YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 1d95204..00768ed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -47,7 +47,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -95,7 +95,7 @@ public class RMActiveServiceContext {
   private ApplicationMasterService applicationMasterService;
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
-  private RMTimelineCollector timelineCollector;
+  private RMTimelineCollectorManager timelineCollectorManager;
 
   private RMNodeLabelsManager nodeLabelManager;
   private long epoch;
@@ -379,14 +379,15 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
-  public RMTimelineCollector getRMTimelineCollector() {
-    return timelineCollector;
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return timelineCollectorManager;
   }
 
   @Private
   @Unstable
-  public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
-    this.timelineCollector = timelineCollector;
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager) {
+    this.timelineCollectorManager = timelineCollectorManager;
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index e511ff0..dcc2a64 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -336,7 +336,6 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
             submissionContext, this.scheduler, this.masterService,
             submitTime, submissionContext.getApplicationType(),
             submissionContext.getApplicationTags(), amReq);
-
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
     // influence each other
@@ -347,6 +346,8 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
       LOG.warn(message);
       throw new YarnException(message);
     }
+    // Start timeline collector for the submitted app
+    application.startTimelineCollector();
     // Inform the ACLs Manager
     this.applicationACLsManager.addApplication(applicationId,
         submissionContext.getAMContainerSpec().getApplicationACLs());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 05fee99..5ef799a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -43,7 +43,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 
 /**
  * Context of the ResourceManager.
@@ -110,9 +110,10 @@ public interface RMContext {
 
   SystemMetricsPublisher getSystemMetricsPublisher();
   
-  void setRMTimelineCollector(RMTimelineCollector timelineCollector);
+  void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager);
   
-  RMTimelineCollector getRMTimelineCollector();
+  RMTimelineCollectorManager getRMTimelineCollectorManager();
 
   ConfigurationProvider getConfigurationProvider();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index fb6907c..3b33f23 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -47,7 +47,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -355,14 +355,14 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
-  public void setRMTimelineCollector(
-      RMTimelineCollector timelineCollector) {
-    activeServiceContext.setRMTimelineCollector(timelineCollector);
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager) {
+    
activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager);
   }
 
   @Override
-  public RMTimelineCollector getRMTimelineCollector() {
-    return activeServiceContext.getRMTimelineCollector();
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return activeServiceContext.getRMTimelineCollectorManager();
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b993ede..059bc07 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -97,11 +97,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
 import 
org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -356,8 +356,8 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     return new RMApplicationHistoryWriter();
   }
 
-  private RMTimelineCollector createRMTimelineCollector() {
-    return new RMTimelineCollector();
+  private RMTimelineCollectorManager createRMTimelineCollectorManager() {
+    return new RMTimelineCollectorManager(rmContext);
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
@@ -482,10 +482,10 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
       addService(systemMetricsPublisher);
       rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
-      RMTimelineCollector timelineCollector =
-          createRMTimelineCollector();
-      addService(timelineCollector);
-      rmContext.setRMTimelineCollector(timelineCollector);
+      RMTimelineCollectorManager timelineCollectorManager =
+          createRMTimelineCollectorManager();
+      addService(timelineCollectorManager);
+      rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
 
       // Register event handler for NodesListManager
       nodesListManager = new NodesListManager(rmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index fba391c..e4686c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -87,6 +87,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -456,6 +458,17 @@ public class RMAppImpl implements RMApp, Recoverable {
           YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
   }
 
+  public void startTimelineCollector() {
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(applicationId);
+    rmContext.getRMTimelineCollectorManager().putIfAbsent(
+        applicationId, collector);
+  }
+
+  public void stopTimelineCollector() {
+    rmContext.getRMTimelineCollectorManager().remove(applicationId);
+  }
+
   @Override
   public ApplicationId getApplicationId() {
     return this.applicationId;
@@ -1261,6 +1274,8 @@ public class RMAppImpl implements RMApp, Recoverable {
           .applicationFinished(app, finalState);
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
+
+      app.stopTimelineCollector();
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
deleted file mode 100644
index 4ea7a03..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
-import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
-import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-
-/**
- * This class is responsible for posting application and appattempt lifecycle
- * related events to timeline service V2
- */
-@Private
-@Unstable
-public class RMTimelineCollector extends TimelineCollector {
-  private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
-
-  public RMTimelineCollector() {
-    super("Resource Manager TimelineCollector");
-  }
-
-  private Dispatcher dispatcher;
-
-  private boolean publishSystemMetricsForV2;
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetricsForV2 =
-        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-            && conf.getBoolean(
-                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
-    if (publishSystemMetricsForV2) {
-      // having separate dispatcher to avoid load on RMDispatcher
-      LOG.info("RMTimelineCollector has been configured to publish"
-          + " System Metrics in ATS V2");
-      dispatcher = new AsyncDispatcher();
-      dispatcher.register(SystemMetricsEventType.class,
-          new ForwardingEventHandler());
-    } else {
-      LOG.warn("RMTimelineCollector has not been configured to publish"
-          + " System Metrics in ATS V2");
-    }
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
-    switch (event.getType()) {
-    default:
-      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-  
-  @Override
-  protected TimelineCollectorContext getTimelineEntityContext() {
-    // TODO address in YARN-3390.
-    return null;
-  }
-
-  /**
-   * EventHandler implementation which forward events to 
SystemMetricsPublisher.
-   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
-   * method.
-   */
-  private final class ForwardingEventHandler implements
-      EventHandler<SystemMetricsEvent> {
-
-    @Override
-    public void handle(SystemMetricsEvent event) {
-      handleSystemMetricsEvent(event);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
new file mode 100644
index 0000000..25e0e0f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RMTimelineCollectorManager extends TimelineCollectorManager {
+  private RMContext rmContext;
+
+  public RMTimelineCollectorManager(RMContext rmContext) {
+    super(RMTimelineCollectorManager.class.getName());
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    RMApp app = rmContext.getRMApps().get(appId);
+    if (app == null) {
+      throw new YarnRuntimeException(
+          "Unable to get the timeline collector context info for a 
non-existing app " +
+              appId);
+    }
+    String userId = app.getUser();
+    if (userId != null && !userId.isEmpty()) {
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    for (String tag : app.getApplicationTags()) {
+      String[] parts = tag.split(":", 2);
+      if (parts.length != 2 || parts[1].isEmpty()) {
+        continue;
+      }
+      switch (parts[0]) {
+        case TimelineUtils.FLOW_NAME_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowName(parts[1]);
+          break;
+        case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowVersion(parts[1]);
+          break;
+        case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowRunId(
+              Long.valueOf(parts[1]));
+          break;
+        default:
+          break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 54c806c..0bdb68a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -34,7 +34,7 @@ import 
org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,13 +42,13 @@ import org.junit.Test;
 import java.io.IOException;
 
 public class TestTimelineServiceClientIntegration {
-  private static TimelineCollectorManager collectorManager;
+  private static NodeTimelineCollectorManager collectorManager;
   private static PerNodeTimelineCollectorsAuxService auxService;
 
   @BeforeClass
   public static void setupClass() throws Exception {
     try {
-      collectorManager = new MyTimelineCollectorManager();
+      collectorManager = new MockNodeTimelineCollectorManager();
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
               collectorManager);
@@ -85,9 +85,9 @@ public class TestTimelineServiceClientIntegration {
     }
   }
 
-  private static class MyTimelineCollectorManager extends
-      TimelineCollectorManager {
-    public MyTimelineCollectorManager() {
+  private static class MockNodeTimelineCollectorManager extends
+      NodeTimelineCollectorManager {
+    public MockNodeTimelineCollectorManager() {
       super();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 5bc70e3..fa32211 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -75,7 +75,7 @@ public class AppLevelTimelineCollector extends 
TimelineCollector {
   }
 
   @Override
-  protected TimelineCollectorContext getTimelineEntityContext() {
+  public TimelineCollectorContext getTimelineEntityContext() {
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
new file mode 100644
index 0000000..03ac770
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ *
+ */
+@Private
+@Unstable
+public class NodeTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(NodeTimelineCollectorManager.class);
+  private static final NodeTimelineCollectorManager INSTANCE =
+      new NodeTimelineCollectorManager();
+
+
+  // REST server for this collector manager
+  private HttpServer2 timelineRestServer;
+
+  private String timelineRestServerBindAddress;
+
+  private CollectorNodemanagerProtocol nmCollectorService;
+
+  private InetSocketAddress nmCollectorServiceAddress;
+
+  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
+
+  static NodeTimelineCollectorManager getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  protected NodeTimelineCollectorManager() {
+    super(NodeTimelineCollectorManager.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.nmCollectorServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    nmCollectorService = getNMCollectorService();
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (timelineRestServer != null) {
+      timelineRestServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    try {
+      // Get context info from NM
+      updateTimelineCollectorContext(appId, collector);
+      // Report to NM if a new collector is added.
+      reportNewCollectorToNM(appId);
+    } catch (YarnException | IOException e) {
+      // throw exception here as it cannot be used if failed communicate with 
NM
+      LOG.error("Failed to communicate with NM Collector Service for " + 
appId);
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  /**
+   * Launch the REST web server for this collector manager
+   */
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
+    try {
+      Configuration confForInfoServer = new Configuration(conf);
+      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+      HttpServer2.Builder builder = new HttpServer2.Builder()
+          .setName("timeline")
+          .setConf(conf)
+          .addEndpoint(URI.create(
+              (YarnConfiguration.useHttps(conf) ? "https://"; : "http://";) +
+                  bindAddress));
+      timelineRestServer = builder.build();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
+      String username = conf.get(HADOOP_HTTP_STATIC_USER,
+          DEFAULT_HADOOP_HTTP_STATIC_USER);
+      options.put(HADOOP_HTTP_STATIC_USER, username);
+      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+          "static_user_filter_timeline",
+          StaticUserWebFilter.StaticUserFilter.class.getName(),
+          options, new String[] {"/*"});
+
+      timelineRestServer.addJerseyResourcePackage(
+          TimelineCollectorWebService.class.getPackage().getName() + ";"
+              + GenericExceptionHandler.class.getPackage().getName() + ";"
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+          "/*");
+      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
+      timelineRestServer.start();
+    } catch (Exception e) {
+      String msg = "The per-node collector webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+    //TODO: We need to think of the case of multiple interfaces
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        timelineRestServer.getConnectorAddress(0));
+    LOG.info("Instantiated the per-node collector webapp at " +
+        timelineRestServerBindAddress);
+  }
+
+  private void reportNewCollectorToNM(ApplicationId appId)
+      throws YarnException, IOException {
+    ReportNewCollectorInfoRequest request =
+        ReportNewCollectorInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new collector for application: " + appId +
+        " to the NM Collector Service.");
+    nmCollectorService.reportNewCollectorInfo(request);
+  }
+
+  private void updateTimelineCollectorContext(
+      ApplicationId appId, TimelineCollector collector)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequest request =
+        GetTimelineCollectorContextRequest.newInstance(appId);
+    LOG.info("Get timeline collector context for " + appId);
+    GetTimelineCollectorContextResponse response =
+        nmCollectorService.getTimelineCollectorContext(request);
+    String userId = response.getUserId();
+    if (userId != null && !userId.isEmpty()) {
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    String flowName = response.getFlowName();
+    if (flowName != null && !flowName.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowName(flowName);
+    }
+    String flowVersion = response.getFlowVersion();
+    if (flowVersion != null && !flowVersion.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
+    }
+    long flowRunId = response.getFlowRunId();
+    if (flowRunId != 0L) {
+      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
+    }
+  }
+
+  @VisibleForTesting
+  protected CollectorNodemanagerProtocol getNMCollectorService() {
+    Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    // TODO Security settings.
+    return (CollectorNodemanagerProtocol) rpc.getProxy(
+        CollectorNodemanagerProtocol.class,
+        nmCollectorServiceAddress, conf);
+  }
+
+  @VisibleForTesting
+  public String getRestServerBindAddress() {
+    return timelineRestServerBindAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 2017d01..36ff5c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -53,15 +53,15 @@ public class PerNodeTimelineCollectorsAuxService extends 
AuxiliaryService {
       LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
-  private final TimelineCollectorManager collectorManager;
+  private final NodeTimelineCollectorManager collectorManager;
 
   public PerNodeTimelineCollectorsAuxService() {
     // use the same singleton
-    this(TimelineCollectorManager.getInstance());
+    this(NodeTimelineCollectorManager.getInstance());
   }
 
   @VisibleForTesting PerNodeTimelineCollectorsAuxService(
-      TimelineCollectorManager collectorsManager) {
+      NodeTimelineCollectorManager collectorsManager) {
     super("timeline_collector");
     this.collectorManager = collectorsManager;
   }
@@ -108,8 +108,7 @@ public class PerNodeTimelineCollectorsAuxService extends 
AuxiliaryService {
    * @return whether it was removed successfully
    */
   public boolean removeApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
-    return collectorManager.remove(appIdString);
+    return collectorManager.remove(appId);
   }
 
   /**
@@ -153,8 +152,8 @@ public class PerNodeTimelineCollectorsAuxService extends 
AuxiliaryService {
   }
 
   @VisibleForTesting
-  boolean hasApplication(String appId) {
-    return collectorManager.containsKey(appId);
+  boolean hasApplication(ApplicationId appId) {
+    return collectorManager.containsTimelineCollector(appId);
   }
 
   @Override
@@ -174,7 +173,7 @@ public class PerNodeTimelineCollectorsAuxService extends 
AuxiliaryService {
 
   @VisibleForTesting
   public static PerNodeTimelineCollectorsAuxService
-      launchServer(String[] args, TimelineCollectorManager collectorManager) {
+      launchServer(String[] args, NodeTimelineCollectorManager 
collectorManager) {
     Thread
       .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index f1d3d72..4eced5b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -124,6 +124,6 @@ public abstract class TimelineCollector extends 
CompositeService {
     }
   }
 
-  protected abstract TimelineCollectorContext getTimelineEntityContext();
+  public abstract TimelineCollectorContext getTimelineEntityContext();
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 9a566a2..7b3da6b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -18,173 +18,97 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Class that manages adding and removing collectors and their lifecycle. It
  * provides thread safety access to the collectors inside.
  *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
  */
-@Private
-@Unstable
-public class TimelineCollectorManager extends CompositeService {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TimelineCollectorManager extends AbstractService {
   private static final Log LOG =
       LogFactory.getLog(TimelineCollectorManager.class);
-  private static final TimelineCollectorManager INSTANCE =
-      new TimelineCollectorManager();
 
   // access to this map is synchronized with the map itself
-  private final Map<String, TimelineCollector> collectors =
+  private final Map<ApplicationId, TimelineCollector> collectors =
       Collections.synchronizedMap(
-          new HashMap<String, TimelineCollector>());
-
-  // REST server for this collector manager
-  private HttpServer2 timelineRestServer;
-
-  private String timelineRestServerBindAddress;
-
-  private CollectorNodemanagerProtocol nmCollectorService;
-
-  private InetSocketAddress nmCollectorServiceAddress;
-
-  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
-
-  static TimelineCollectorManager getInstance() {
-    return INSTANCE;
-  }
+          new HashMap<ApplicationId, TimelineCollector>());
 
-  @VisibleForTesting
-  protected TimelineCollectorManager() {
-    super(TimelineCollectorManager.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    this.nmCollectorServiceAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_BIND_HOST,
-        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
-
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    nmCollectorService = getNMCollectorService();
-    startWebApp();
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (timelineRestServer != null) {
-      timelineRestServer.stop();
-    }
-    super.serviceStop();
+  protected TimelineCollectorManager(String name) {
+    super(name);
   }
 
   /**
    * Put the collector into the collection if an collector mapped by id does
    * not exist.
    *
-   * @throws YarnRuntimeException if there was any exception in initializing 
and
-   * starting the app level service
+   * @throws YarnRuntimeException if there  was any exception in initializing
+   *                              and starting the app level service
    * @return the collector associated with id after the potential put.
    */
   public TimelineCollector putIfAbsent(ApplicationId appId,
       TimelineCollector collector) {
-    String id = appId.toString();
-    TimelineCollector collectorInTable;
-    boolean collectorIsNew = false;
+    TimelineCollector collectorInTable = null;
     synchronized (collectors) {
-      collectorInTable = collectors.get(id);
+      collectorInTable = collectors.get(appId);
       if (collectorInTable == null) {
         try {
           // initialize, start, and add it to the collection so it can be
           // cleaned up when the parent shuts down
           collector.init(getConfig());
           collector.start();
-          collectors.put(id, collector);
-          LOG.info("the collector for " + id + " was added");
+          collectors.put(appId, collector);
+          LOG.info("the collector for " + appId + " was added");
           collectorInTable = collector;
-          collectorIsNew = true;
+          postPut(appId, collectorInTable);
         } catch (Exception e) {
           throw new YarnRuntimeException(e);
         }
       } else {
-        String msg = "the collector for " + id + " already exists!";
-        LOG.error(msg);
-      }
-
-    }
-    // Report to NM if a new collector is added.
-    if (collectorIsNew) {
-      try {
-        updateTimelineCollectorContext(appId, collector);
-        reportNewCollectorToNM(appId);
-      } catch (Exception e) {
-        // throw exception here as it cannot be used if failed communicate 
with NM
-        LOG.error("Failed to communicate with NM Collector Service for " + 
appId);
-        throw new YarnRuntimeException(e);
+        LOG.info("the collector for " + appId + " already exists!");
       }
     }
-
     return collectorInTable;
   }
 
+  protected void postPut(ApplicationId appId, TimelineCollector collector) {
+
+  }
+
   /**
    * Removes the collector for the specified id. The collector is also stopped
    * as a result. If the collector does not exist, no change is made.
    *
    * @return whether it was removed successfully
    */
-  public boolean remove(String id) {
-    synchronized (collectors) {
-      TimelineCollector collector = collectors.remove(id);
-      if (collector == null) {
-        String msg = "the collector for " + id + " does not exist!";
-        LOG.error(msg);
-        return false;
-      } else {
-        // stop the service to do clean up
-        collector.stop();
-        LOG.info("the collector service for " + id + " was removed");
-        return true;
-      }
+  public boolean remove(ApplicationId appId) {
+    TimelineCollector collector = collectors.remove(appId);
+    if (collector == null) {
+      LOG.error("the collector for " + appId + " does not exist!");
+    } else {
+      postRemove(appId, collector);
+      // stop the service to do clean up
+      collector.stop();
+      LOG.info("the collector service for " + appId + " was removed");
     }
+    return collector != null;
+  }
+
+  protected void postRemove(ApplicationId appId, TimelineCollector collector) {
+
   }
 
   /**
@@ -192,113 +116,16 @@ public class TimelineCollectorManager extends 
CompositeService {
    *
    * @return the collector or null if it does not exist
    */
-  public TimelineCollector get(String id) {
-    return collectors.get(id);
+  public TimelineCollector get(ApplicationId appId) {
+    return collectors.get(appId);
   }
 
   /**
    * Returns whether the collector for the specified id exists in this
    * collection.
    */
-  public boolean containsKey(String id) {
-    return collectors.containsKey(id);
-  }
-
-  /**
-   * Launch the REST web server for this collector manager
-   */
-  private void startWebApp() {
-    Configuration conf = getConfig();
-    String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
-    try {
-      Configuration confForInfoServer = new Configuration(conf);
-      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
-      HttpServer2.Builder builder = new HttpServer2.Builder()
-          .setName("timeline")
-          .setConf(conf)
-          .addEndpoint(URI.create(
-              (YarnConfiguration.useHttps(conf) ? "https://"; : "http://";) +
-                  bindAddress));
-      timelineRestServer = builder.build();
-      // TODO: replace this by an authentication filter in future.
-      HashMap<String, String> options = new HashMap<>();
-      String username = conf.get(HADOOP_HTTP_STATIC_USER,
-          DEFAULT_HADOOP_HTTP_STATIC_USER);
-      options.put(HADOOP_HTTP_STATIC_USER, username);
-      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
-          "static_user_filter_timeline",
-          StaticUserWebFilter.StaticUserFilter.class.getName(),
-          options, new String[] {"/*"});
-
-      timelineRestServer.addJerseyResourcePackage(
-          TimelineCollectorWebService.class.getPackage().getName() + ";"
-              + GenericExceptionHandler.class.getPackage().getName() + ";"
-              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
-          "/*");
-      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
-      timelineRestServer.start();
-    } catch (Exception e) {
-      String msg = "The per-node collector webapp failed to start.";
-      LOG.error(msg, e);
-      throw new YarnRuntimeException(msg, e);
-    }
-    //TODO: We need to think of the case of multiple interfaces
-    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
-        timelineRestServer.getConnectorAddress(0));
-    LOG.info("Instantiated the per-node collector webapp at " +
-        timelineRestServerBindAddress);
-  }
-
-  private void reportNewCollectorToNM(ApplicationId appId)
-      throws YarnException, IOException {
-    ReportNewCollectorInfoRequest request =
-        ReportNewCollectorInfoRequest.newInstance(appId,
-            this.timelineRestServerBindAddress);
-    LOG.info("Report a new collector for application: " + appId +
-        " to the NM Collector Service.");
-    nmCollectorService.reportNewCollectorInfo(request);
+  public boolean containsTimelineCollector(ApplicationId appId) {
+    return collectors.containsKey(appId);
   }
 
-  private void updateTimelineCollectorContext(
-      ApplicationId appId, TimelineCollector collector)
-      throws YarnException, IOException {
-    GetTimelineCollectorContextRequest request =
-        GetTimelineCollectorContextRequest.newInstance(appId);
-    LOG.info("Get timeline collector context for " + appId);
-    GetTimelineCollectorContextResponse response =
-        nmCollectorService.getTimelineCollectorContext(request);
-    String userId = response.getUserId();
-    if (userId != null && !userId.isEmpty()) {
-      collector.getTimelineEntityContext().setUserId(userId);
-    }
-    String flowName = response.getFlowName();
-    if (flowName != null && !flowName.isEmpty()) {
-      collector.getTimelineEntityContext().setFlowName(flowName);
-    }
-    String flowVersion = response.getFlowVersion();
-    if (flowVersion != null && !flowVersion.isEmpty()) {
-      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
-    }
-    long flowRunId = response.getFlowRunId();
-    if (flowRunId != 0L) {
-      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
-    }
-  }
-
-  @VisibleForTesting
-  protected CollectorNodemanagerProtocol getNMCollectorService() {
-    Configuration conf = getConfig();
-    final YarnRPC rpc = YarnRPC.create(conf);
-
-    // TODO Security settings.
-    return (CollectorNodemanagerProtocol) rpc.getProxy(
-        CollectorNodemanagerProtocol.class,
-        nmCollectorServiceAddress, conf);
-  }
-
-  @VisibleForTesting
-  public String getRestServerBindAddress() {
-    return timelineRestServerBindAddress;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index edec0d3..2165c66 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
@@ -129,11 +130,14 @@ public class TimelineCollectorWebService {
     boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
 
     try {
-      appId = parseApplicationId(appId);
-      if (appId == null) {
+      ApplicationId appID = parseApplicationId(appId);
+      if (appID == null) {
         return Response.status(Response.Status.BAD_REQUEST).build();
       }
-      TimelineCollector collector = getCollector(req, appId);
+      NodeTimelineCollectorManager collectorManager =
+          (NodeTimelineCollectorManager) context.getAttribute(
+              NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+      TimelineCollector collector = collectorManager.get(appID);
       if (collector == null) {
         LOG.error("Application: "+ appId + " is not found");
         throw new NotFoundException(); // different exception?
@@ -147,10 +151,10 @@ public class TimelineCollectorWebService {
     }
   }
 
-  private String parseApplicationId(String appId) {
+  private ApplicationId parseApplicationId(String appId) {
     try {
       if (appId != null) {
-        return ConverterUtils.toApplicationId(appId.trim()).toString();
+        return ConverterUtils.toApplicationId(appId.trim());
       } else {
         return null;
       }
@@ -159,15 +163,6 @@ public class TimelineCollectorWebService {
     }
   }
 
-  private TimelineCollector
-      getCollector(HttpServletRequest req, String appIdToParse) {
-    String appIdString = parseApplicationId(appIdToParse);
-    final TimelineCollectorManager collectorManager =
-        (TimelineCollectorManager) context.getAttribute(
-            TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
-    return collectorManager.get(appIdString);
-  }
-
   private void init(HttpServletResponse response) {
     response.setContentType(null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
new file mode 100644
index 0000000..87343fd
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -0,0 +1,160 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMTimelineCollectorManager {
+  private NodeTimelineCollectorManager collectorManager;
+
+  @Before
+  public void setup() throws Exception {
+    collectorManager = createCollectorManager();
+    collectorManager.init(new YarnConfiguration());
+    collectorManager.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (collectorManager != null) {
+      collectorManager.stop();
+    }
+  }
+
+  @Test
+  public void testStartWebApp() throws Exception {
+    assertNotNull(collectorManager.getRestServerBindAddress());
+    String address = collectorManager.getRestServerBindAddress();
+    String[] parts = address.split(":");
+    assertEquals(2, parts.length);
+    assertNotNull(parts[0]);
+    assertTrue(Integer.valueOf(parts[1]) > 0);
+  }
+
+  @Test(timeout=60000)
+  public void testMultithreadedAdd() throws Exception {
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineCollector collector =
+              new AppLevelTimelineCollector(appId);
+          return (collectorManager.putIfAbsent(appId, collector) == collector);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      assertTrue(collectorManager.containsTimelineCollector(appId));
+    }
+  }
+
+  @Test
+  public void testMultithreadedAddAndRemove() throws Exception {
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineCollector collector =
+              new AppLevelTimelineCollector(appId);
+          boolean successPut =
+              (collectorManager.putIfAbsent(appId, collector) == collector);
+          return successPut && collectorManager.remove(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      assertFalse(collectorManager.containsTimelineCollector(appId));
+    }
+  }
+
+  private NodeTimelineCollectorManager createCollectorManager() {
+    final NodeTimelineCollectorManager collectorManager =
+        spy(new NodeTimelineCollectorManager());
+    doReturn(new Configuration()).when(collectorManager).getConfig();
+    CollectorNodemanagerProtocol nmCollectorService =
+        mock(CollectorNodemanagerProtocol.class);
+    GetTimelineCollectorContextResponse response =
+        GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
+    try {
+      when(nmCollectorService.getTimelineCollectorContext(any(
+          GetTimelineCollectorContextRequest.class))).thenReturn(response);
+    } catch (YarnException | IOException e) {
+      fail();
+    }
+    
doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
+    return collectorManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
index abbe13a..b1a5b04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -67,8 +67,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   public void testAddApplication() throws Exception {
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
-    assertTrue(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
   }
 
@@ -82,16 +81,14 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     auxService.initializeContainer(context);
     // auxService should not have that app
-    assertFalse(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
+    assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
   }
 
   @Test
   public void testRemoveApplication() throws Exception {
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
 
     ContainerId containerId = getAMContainerId();
     ContainerTerminationContext context =
@@ -99,7 +96,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     auxService.stopContainer(context);
     // auxService should not have that app
-    assertFalse(auxService.hasApplication(appIdStr));
+    assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
   }
 
@@ -107,8 +104,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   public void testRemoveApplicationNonAMContainer() throws Exception {
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
 
     ContainerId containerId = getContainerId(2L); // not an AM
     ContainerTerminationContext context =
@@ -116,7 +112,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     auxService.stopContainer(context);
     // auxService should still have that app
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
   }
 
@@ -147,7 +143,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   }
 
   private PerNodeTimelineCollectorsAuxService createCollector() {
-    TimelineCollectorManager collectorManager = createCollectorManager();
+    NodeTimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService auxService =
         spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
     auxService.init(new YarnConfiguration());
@@ -155,9 +151,9 @@ public class TestPerNodeTimelineCollectorsAuxService {
     return auxService;
   }
 
-  private TimelineCollectorManager createCollectorManager() {
-    TimelineCollectorManager collectorManager =
-        spy(new TimelineCollectorManager());
+  private NodeTimelineCollectorManager createCollectorManager() {
+    NodeTimelineCollectorManager collectorManager =
+        spy(new NodeTimelineCollectorManager());
     doReturn(new Configuration()).when(collectorManager).getConfig();
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58221188/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
deleted file mode 100644
index c662998..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
+++ /dev/null
@@ -1,160 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.collector;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestTimelineCollectorManager {
-  private TimelineCollectorManager collectorManager;
-
-  @Before
-  public void setup() throws Exception {
-    collectorManager = createCollectorManager();
-    collectorManager.init(new YarnConfiguration());
-    collectorManager.start();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (collectorManager != null) {
-      collectorManager.stop();
-    }
-  }
-
-  @Test
-  public void testStartWebApp() throws Exception {
-    assertNotNull(collectorManager.getRestServerBindAddress());
-    String address = collectorManager.getRestServerBindAddress();
-    String[] parts = address.split(":");
-    assertEquals(2, parts.length);
-    assertNotNull(parts[0]);
-    assertTrue(Integer.valueOf(parts[1]) > 0);
-  }
-
-  @Test(timeout=60000)
-  public void testMultithreadedAdd() throws Exception {
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId);
-          return (collectorManager.putIfAbsent(appId, collector) == collector);
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      assertTrue(collectorManager.containsKey(appId.toString()));
-    }
-  }
-
-  @Test
-  public void testMultithreadedAddAndRemove() throws Exception {
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId);
-          boolean successPut =
-              (collectorManager.putIfAbsent(appId, collector) == collector);
-          return successPut && collectorManager.remove(appId.toString());
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      assertFalse(collectorManager.containsKey(appId.toString()));
-    }
-  }
-
-  private TimelineCollectorManager createCollectorManager() {
-    final TimelineCollectorManager collectorManager =
-        spy(new TimelineCollectorManager());
-    doReturn(new Configuration()).when(collectorManager).getConfig();
-    CollectorNodemanagerProtocol nmCollectorService =
-        mock(CollectorNodemanagerProtocol.class);
-    GetTimelineCollectorContextResponse response =
-        GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
-    try {
-      when(nmCollectorService.getTimelineCollectorContext(any(
-          GetTimelineCollectorContextRequest.class))).thenReturn(response);
-    } catch (YarnException | IOException e) {
-      fail();
-    }
-    
doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
-    return collectorManager;
-  }
-}

Reply via email to