http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index c8c9303..eedb501 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 
 import org.apache.commons.cli.CommandLine;
@@ -187,6 +189,10 @@ public class Client {
   // Timeline domain writer access control
   private String modifyACLs = null;
 
+  private String flowName = null;
+  private String flowVersion = null;
+  private long flowRunId = 0L;
+
   // Command line options
   private Options opts;
 
@@ -258,7 +264,8 @@ public class Client {
     opts.addOption("shell_args", true, "Command line args for the shell 
script." +
         "Multiple args can be separated by empty space.");
     opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
-    opts.addOption("shell_env", true, "Environment for shell script. Specified 
as env_key=env_val pairs");
+    opts.addOption("shell_env", true,
+        "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command 
containers");
     opts.addOption("container_memory", true, "Amount of memory in MB to be 
requested to run the shell command");
     opts.addOption("container_vcores", true, "Amount of virtual cores to be 
requested to run the shell command");
@@ -284,6 +291,12 @@ public class Client {
         + "modify the timeline entities in the given domain");
     opts.addOption("create", false, "Flag to indicate whether to create the "
         + "domain specified with -domain.");
+    opts.addOption("flow_name", true, "Flow name which the distributed shell "
+        + "app belongs to");
+    opts.addOption("flow_version", true, "Flow version which the distributed "
+        + "shell app belongs to");
+    opts.addOption("flow_run_id", true, "Flow run ID which the distributed "
+        + "shell app belongs to");
     opts.addOption("help", false, "Print usage");
     opts.addOption("node_label_expression", true,
         "Node label expression to determine the nodes"
@@ -463,6 +476,20 @@ public class Client {
           + cliParser.getOptionValue("container_retry_interval"));
     }
 
+    if (cliParser.hasOption("flow_name")) {
+      flowName = cliParser.getOptionValue("flow_name");
+    }
+    if (cliParser.hasOption("flow_version")) {
+      flowVersion = cliParser.getOptionValue("flow_version");
+    }
+    if (cliParser.hasOption("flow_run_id")) {
+      try {
+        flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id"));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Flow run is not a valid long value", e);
+      }
+    }
     return true;
   }
 
@@ -554,6 +581,18 @@ public class Client {
         .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
     }
 
+    Set<String> tags = new HashSet<String>();
+    if (flowName != null) {
+      tags.add(TimelineUtils.generateFlowNameTag(flowName));
+    }
+    if (flowVersion != null) {
+      tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
+    }
+    if (flowRunId != 0) {
+      tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
+    }
+    appContext.setApplicationTags(tags);
+
     // set local resources for the application master
     // local files or archives as needed
     // In this scenario, the jar file for the application master is part of 
the local resources                        
@@ -667,7 +706,7 @@ public class Client {
 
     for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
       vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
-    }                  
+    }
     if (debugFlag) {
       vargs.add("--debug");
     }
@@ -683,7 +722,7 @@ public class Client {
       command.append(str).append(" ");
     }
 
-    LOG.info("Completed setting up app master command " + command.toString()); 
   
+    LOG.info("Completed setting up app master command " + command.toString());
     List<String> commands = new ArrayList<String>();
     commands.add(command.toString());          
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 4ab9637..a2d82db 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,29 +56,37 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
+import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
-import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -85,8 +94,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import com.sun.jersey.api.client.ClientHandlerException;
-
 public class TestDistributedShell {
 
   private static final Log LOG =
@@ -99,6 +106,7 @@ public class TestDistributedShell {
   protected YarnConfiguration conf = null;
   private static final int NUM_NMS = 1;
   private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
@@ -120,17 +128,36 @@ public class TestDistributedShell {
 
   private void setupInternal(int numNodeManager, float timelineVersion)
       throws Exception {
-
     LOG.info("Starting up YARN cluster");
-    
+
     conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
+    // reduce the teardown waiting time
+    conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // mark if we need to launch the v1 timeline server
+    // disable aux-service based timeline aggregators
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+
+    conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
     conf.set(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class.getName());
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     conf.set("mapreduce.jobhistory.address",
         "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+    // Enable ContainersMonitorImpl
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+        LinuxResourceCalculatorPlugin.class.getName());
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+        ProcfsBasedProcessTree.class.getName());
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setBoolean(
+        YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
+        true);
+    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+          true);
 
     // ATS version specific settings
     if (timelineVersion == 1.0f) {
@@ -148,6 +175,19 @@ public class TestDistributedShell {
       PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
       conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
           DistributedShellTimelinePlugin.class.getName());
+    } else if (timelineVersion == 2.0f) {
+      // set version to 2
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+      // disable v1 timeline server since we no longer have a server here
+      // enable aux-service based timeline aggregators
+      conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+      conf.set(YarnConfiguration.NM_AUX_SERVICES + "." +
+          TIMELINE_AUX_SERVICE_NAME + ".class",
+          PerNodeTimelineCollectorsAuxService.class.getName());
+      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+          FileSystemTimelineWriterImpl.class,
+          org.apache.hadoop.yarn.server.timelineservice.storage.
+              TimelineWriter.class);
     } else {
       Assert.fail("Wrong timeline version number: " + timelineVersion);
     }
@@ -241,7 +281,30 @@ public class TestDistributedShell {
     testDSShell(true);
   }
 
+  @Test
+  @TimelineVersion(2.0f)
+  public void testDSShellWithoutDomainV2() throws Exception {
+    testDSShell(false);
+  }
+
   public void testDSShell(boolean haveDomain) throws Exception {
+    testDSShell(haveDomain, true);
+  }
+
+  @Test
+  @TimelineVersion(2.0f)
+  public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
+    testDSShell(false, true);
+  }
+
+  @Test
+  @TimelineVersion(2.0f)
+  public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
+    testDSShell(false, false);
+  }
+
+  public void testDSShell(boolean haveDomain, boolean defaultFlow)
+      throws Exception {
     String[] args = {
         "--jar",
         APPMASTER_JAR,
@@ -268,9 +331,23 @@ public class TestDistributedShell {
           "writer_user writer_group",
           "--create"
       };
-      List<String> argsList = new ArrayList<String>(Arrays.asList(args));
-      argsList.addAll(Arrays.asList(domainArgs));
-      args = argsList.toArray(new String[argsList.size()]);
+      args = mergeArgs(args, domainArgs);
+    }
+    boolean isTestingTimelineV2 = false;
+    if (timelineVersionWatcher.getTimelineVersion() == 2.0f) {
+      isTestingTimelineV2 = true;
+      if (!defaultFlow) {
+        String[] flowArgs = {
+            "--flow_name",
+            "test_flow_name",
+            "--flow_version",
+            "test_flow_version",
+            "--flow_run_id",
+            "12345678"
+        };
+        args = mergeArgs(args, flowArgs);
+      }
+      LOG.info("Setup: Using timeline v2!");
     }
 
     LOG.info("Initializing DS Client");
@@ -297,13 +374,16 @@ public class TestDistributedShell {
 
     boolean verified = false;
     String errorMessage = "";
+    ApplicationId appId = null;
+    ApplicationReport appReport = null;
     while(!verified) {
       List<ApplicationReport> apps = yarnClient.getApplications();
       if (apps.size() == 0 ) {
         Thread.sleep(10);
         continue;
       }
-      ApplicationReport appReport = apps.get(0);
+      appReport = apps.get(0);
+      appId = appReport.getApplicationId();
       if(appReport.getHost().equals("N/A")) {
         Thread.sleep(10);
         continue;
@@ -315,13 +395,16 @@ public class TestDistributedShell {
       if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
         verified = true;
       }
-      if (appReport.getYarnApplicationState() == 
YarnApplicationState.FINISHED) {
+
+      if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
+          && appReport.getFinalApplicationStatus() !=
+              FinalApplicationStatus.UNDEFINED) {
         break;
       }
     }
     Assert.assertTrue(errorMessage, verified);
     t.join();
-    LOG.info("Client run completed. Result=" + result);
+    LOG.info("Client run completed for testDSShell. Result=" + result);
     Assert.assertTrue(result.get());
 
     if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
@@ -343,6 +426,15 @@ public class TestDistributedShell {
     }
 
     TimelineDomain domain = null;
+    if (!isTestingTimelineV2) {
+      checkTimelineV1(haveDomain);
+    } else {
+      checkTimelineV2(haveDomain, appId, defaultFlow, appReport);
+    }
+  }
+
+  private void checkTimelineV1(boolean haveDomain) throws Exception {
+    TimelineDomain domain = null;
     if (haveDomain) {
       domain = yarnCluster.getApplicationHistoryServer()
           .getTimelineStore().getDomain("TEST_DOMAIN");
@@ -393,6 +485,179 @@ public class TestDistributedShell {
     }
   }
 
+  private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
+      boolean defaultFlow, ApplicationReport appReport) throws Exception {
+    LOG.info("Started checkTimelineV2 ");
+    // For PoC check in /tmp/timeline_service_data YARN-3264
+    String tmpRoot =
+        FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+            + "/entities/";
+
+    File tmpRootFolder = new File(tmpRoot);
+    try {
+      Assert.assertTrue(tmpRootFolder.isDirectory());
+      String basePath = tmpRoot +
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+          UserGroupInformation.getCurrentUser().getShortUserName() +
+          (defaultFlow ?
+              "/" + appReport.getName() + "/" +
+                  TimelineUtils.DEFAULT_FLOW_VERSION +"/" +
+                  appReport.getStartTime() +"/" :
+              "/test_flow_name/test_flow_version/12345678/") +
+          appId.toString();
+      LOG.info("basePath: " + basePath);
+      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+
+      // Verify DS_APP_ATTEMPT entities posted by the client
+      // there will be at least one attempt, look for that file
+      String appTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + 
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
+          appTimestampFileName);
+
+      // Verify DS_CONTAINER entities posted by the client
+      String containerTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000002.thist";
+      verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
+          containerTimestampFileName);
+
+      // Verify NM posting container metrics info.
+      String containerMetricsTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000001"
+              + 
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File containerEntityFile = verifyEntityTypeFileExists(basePath,
+          TimelineEntityType.YARN_CONTAINER.toString(),
+          containerMetricsTimestampFileName);
+      Assert.assertEquals(
+          "Container created event needs to be published atleast once",
+          1,
+          getNumOfStringOccurences(containerEntityFile,
+              ContainerMetricsConstants.CREATED_EVENT_TYPE));
+
+      // to avoid race condition of testcase, atleast check 4 times with sleep
+      // of 500ms
+      long numOfContainerFinishedOccurences = 0;
+      for (int i = 0; i < 4; i++) {
+        numOfContainerFinishedOccurences =
+            getNumOfStringOccurences(containerEntityFile,
+                ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+        if (numOfContainerFinishedOccurences > 0) {
+          break;
+        } else {
+          Thread.sleep(500L);
+        }
+      }
+      Assert.assertEquals(
+          "Container finished event needs to be published atleast once",
+          1,
+          numOfContainerFinishedOccurences);
+
+      // Verify RM posting Application life cycle Events are getting published
+      String appMetricsTimestampFileName =
+          "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + 
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION.toString(),
+              appMetricsTimestampFileName);
+      Assert.assertEquals(
+          "Application created event should be published atleast once",
+          1,
+          getNumOfStringOccurences(appEntityFile,
+              ApplicationMetricsConstants.CREATED_EVENT_TYPE));
+
+      // to avoid race condition of testcase, atleast check 4 times with sleep
+      // of 500ms
+      long numOfStringOccurences = 0;
+      for (int i = 0; i < 4; i++) {
+        numOfStringOccurences =
+            getNumOfStringOccurences(appEntityFile,
+                ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        if (numOfStringOccurences > 0) {
+          break;
+        } else {
+          Thread.sleep(500L);
+        }
+      }
+      Assert.assertEquals(
+          "Application finished event should be published atleast once",
+          1,
+          numOfStringOccurences);
+
+      // Verify RM posting AppAttempt life cycle Events are getting published
+      String appAttemptMetricsTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + 
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appAttemptEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+              appAttemptMetricsTimestampFileName);
+      Assert.assertEquals(
+          "AppAttempt register event should be published atleast once",
+          1,
+          getNumOfStringOccurences(appAttemptEntityFile,
+              AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
+
+      Assert.assertEquals(
+          "AppAttempt finished event should be published atleast once",
+          1,
+          getNumOfStringOccurences(appAttemptEntityFile,
+              AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
+    } finally {
+      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+    }
+  }
+
+  private File verifyEntityTypeFileExists(String basePath, String entityType,
+      String entityfileName) {
+    String outputDirPathForEntity = basePath + "/" + entityType + "/";
+    File outputDirForEntity = new File(outputDirPathForEntity);
+    Assert.assertTrue(outputDirForEntity.isDirectory());
+
+    String entityFilePath = outputDirPathForEntity + entityfileName;
+
+    File entityFile = new File(entityFilePath);
+    Assert.assertTrue(entityFile.exists());
+    return entityFile;
+  }
+
+  private long getNumOfStringOccurences(File entityFile, String searchString)
+      throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    long actualCount = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().contains(searchString)) {
+          actualCount++;
+        }
+      }
+    } finally {
+      reader.close();
+    }
+    return actualCount;
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our argumemts.
+   *
+   * @param args
+   * @param newArgs
+   * @return a String array consists of {args, newArgs}
+   */
+  private String[] mergeArgs(String[] args, String[] newArgs) {
+    List<String> argsList = new ArrayList<String>(Arrays.asList(args));
+    argsList.addAll(Arrays.asList(newArgs));
+    return argsList.toArray(new String[argsList.size()]);
+  }
+
   /*
    * NetUtils.getHostname() returns a string in the form "hostname/ip".
    * Sometimes the hostname we get is the FQDN and sometimes the short name. In
@@ -1052,4 +1317,3 @@ public class TestDistributedShell {
     return numOfWords;
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
index b40548f..143e5c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java
@@ -41,7 +41,7 @@ public class TestDistributedShellWithNodeLabels {
   
   static final int NUM_NMS = 2;
   TestDistributedShell distShellTest;
- 
+
   @Before
   public void setup() throws Exception {
     distShellTest = new TestDistributedShell();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 2a56edf..f74729d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -52,6 +52,8 @@ public abstract class AMRMClient<T extends 
AMRMClient.ContainerRequest> extends
     AbstractService {
   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
 
+  private TimelineClient timelineClient;
+
   /**
    * Create a new instance of AMRMClient.
    * For usage:
@@ -584,10 +586,26 @@ public abstract class AMRMClient<T extends 
AMRMClient.ContainerRequest> extends
   }
 
   /**
+   * Register TimelineClient to AMRMClient.
+   * @param client the timeline client to register
+   */
+  public void registerTimelineClient(TimelineClient client) {
+    this.timelineClient = client;
+  }
+
+  /**
+   * Get registered timeline client.
+   * @return the registered timeline client
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return this.timelineClient;
+  }
+
+  /**
    * Wait for <code>check</code> to return true for each 1000 ms.
    * See also {@link #waitFor(com.google.common.base.Supplier, int)}
    * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
-   * @param check
+   * @param check the condition for which it should wait
    */
   public void waitFor(Supplier<Boolean> check) throws InterruptedException {
     waitFor(check, 1000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 75fe790..82208e4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -314,6 +315,22 @@ extends AbstractService {
   public abstract int getClusterNodeCount();
 
   /**
+   * Register TimelineClient to AMRMClient.
+   * @param timelineClient
+   */
+  public void registerTimelineClient(TimelineClient timelineClient) {
+    client.registerTimelineClient(timelineClient);
+  }
+
+  /**
+   * Get registered timeline client.
+   * @return the registered timeline client
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return client.getRegisteredTimeineClient();
+  }
+
+  /**
    * Update application's blacklist with addition or removal resources.
    *
    * @param blacklistAdditions list of resources which should be added to the
@@ -328,7 +345,7 @@ extends AbstractService {
    * Wait for <code>check</code> to return true for each 1000 ms.
    * See also {@link #waitFor(com.google.common.base.Supplier, int)}
    * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
-   * @param check
+   * @param check the condition for which it should wait
    */
   public void waitFor(Supplier<Boolean> check) throws InterruptedException {
     waitFor(check, 1000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index ae0ab9d..f55b724 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -67,6 +68,8 @@ extends AMRMClientAsync<T> {
   private volatile boolean keepRunning;
   private volatile float progress;
   
+  private volatile String collectorAddr;
+
   private volatile Throwable savedException;
 
   /**
@@ -325,6 +328,19 @@ extends AMRMClientAsync<T> {
             LOG.info("Interrupted while waiting for queue", ex);
             continue;
           }
+
+          String collectorAddress = response.getCollectorAddr();
+          TimelineClient timelineClient = client.getRegisteredTimeineClient();
+          if (timelineClient != null && collectorAddress != null
+              && !collectorAddress.isEmpty()) {
+            if (collectorAddr == null
+                || !collectorAddr.equals(collectorAddress)) {
+              collectorAddr = collectorAddress;
+              timelineClient.setTimelineServiceAddress(collectorAddress);
+              LOG.info("collectorAddress " + collectorAddress);
+            }
+          }
+
           List<NodeReport> updatedNodes = response.getUpdatedNodes();
           if (!updatedNodes.isEmpty()) {
             handler.onNodesUpdated(updatedNodes);
@@ -351,7 +367,6 @@ extends AMRMClientAsync<T> {
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);
           }
-
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index f6bd613..b6b8186 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -214,6 +214,7 @@
             <exclude>src/main/resources/webapps/jobhistory/.keep</exclude>
             <exclude>src/main/resources/webapps/yarn/.keep</exclude>
             
<exclude>src/main/resources/webapps/applicationhistory/.keep</exclude>
+            <exclude>src/main/resources/webapps/timeline/.keep</exclude>
             <exclude>src/main/resources/webapps/cluster/.keep</exclude>
             <exclude>src/main/resources/webapps/test/.keep</exclude>
             <exclude>src/main/resources/webapps/proxy/.keep</exclude>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 8625e25..b4f51ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -392,6 +392,23 @@ public class AllocateResponsePBImpl extends 
AllocateResponse {
     this.amrmToken = amRMToken;
   }
 
+
+  @Override
+  public synchronized String getCollectorAddr() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getCollectorAddr();
+  }
+
+  @Override
+  public synchronized void setCollectorAddr(String collectorAddr) {
+    maybeInitBuilder();
+    if (collectorAddr == null) {
+      builder.clearCollectorAddr();
+      return;
+    }
+    builder.setCollectorAddr(collectorAddr);
+  }
+
   @Override
   public synchronized Priority getApplicationPriority() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index 09298b5..cc76718 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -28,8 +28,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
@@ -51,8 +52,13 @@ public abstract class TimelineClient extends AbstractService 
implements
    * current user may use {@link UserGroupInformation#doAs} another user to
    * construct and initialize a timeline client if the following operations are
    * supposed to be conducted by that user.
+   */
+  private ApplicationId contextAppId;
+
+  /**
+   * Creates an instance of the timeline v.1.x client.
    *
-   * @return a timeline client
+   * @return the created timeline client instance
    */
   @Public
   public static TimelineClient createTimelineClient() {
@@ -60,9 +66,23 @@ public abstract class TimelineClient extends AbstractService 
implements
     return client;
   }
 
+  /**
+   * Creates an instance of the timeline v.2 client.
+   *
+   * @param appId the application id with which the timeline client is
+   * associated
+   * @return the created timeline client instance
+   */
+  @Public
+  public static TimelineClient createTimelineClient(ApplicationId appId) {
+    TimelineClient client = new TimelineClientImpl(appId);
+    return client;
+  }
+
   @Private
-  protected TimelineClient(String name) {
+  protected TimelineClient(String name, ApplicationId appId) {
     super(name);
+    setContextAppId(appId);
   }
 
   /**
@@ -75,8 +95,8 @@ public abstract class TimelineClient extends AbstractService 
implements
    * @param entities
    *          the collection of {@link TimelineEntity}
    * @return the error information if the sent entities are not correctly 
stored
-   * @throws IOException
-   * @throws YarnException
+   * @throws IOException if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
    */
   @Public
   public abstract TimelinePutResponse putEntities(
@@ -96,8 +116,8 @@ public abstract class TimelineClient extends AbstractService 
implements
    * @param entities
    *          the collection of {@link TimelineEntity}
    * @return the error information if the sent entities are not correctly 
stored
-   * @throws IOException
-   * @throws YarnException
+   * @throws IOException if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
    */
   @Public
   public abstract TimelinePutResponse putEntities(
@@ -187,4 +207,57 @@ public abstract class TimelineClient extends 
AbstractService implements
   public abstract void cancelDelegationToken(
       Token<TimelineDelegationTokenIdentifier> timelineDT)
           throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is a blocking API. The method will not return
+   * until all the put entities have been persisted. If this method is invoked
+   * for a non-v.2 timeline client instance, a YarnException is thrown.
+   * </p>
+   *
+   * @param entities the collection of {@link
+   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract void putEntities(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
+          entities) throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is an asynchronous API. The method will return
+   * once all the entities are received. If this method is invoked for a
+   * non-v.2 timeline client instance, a YarnException is thrown.
+   * </p>
+   *
+   * @param entities the collection of {@link
+   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract void putEntitiesAsync(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
+          entities) throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Update the timeline service address where the request will be sent to.
+   * </p>
+   * @param address
+   *          the timeline service address
+   */
+  public abstract void setTimelineServiceAddress(String address);
+
+  protected ApplicationId getContextAppId() {
+    return contextAppId;
+  }
+
+  protected void setContextAppId(ApplicationId appId) {
+    this.contextAppId = appId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index d4c68f7..5f18d22 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -30,10 +30,21 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -43,8 +54,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -54,6 +65,7 @@ import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
 import 
org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import 
org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -66,6 +78,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.map.ObjectMapper;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -78,6 +91,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.api.client.filter.ClientFilter;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @Private
 @Evolving
@@ -86,6 +100,8 @@ public class TimelineClientImpl extends TimelineClient {
   private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
   private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
   private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
+  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
   private static final Joiner JOINER = Joiner.on("");
   public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
 
@@ -106,7 +122,6 @@ public class TimelineClientImpl extends TimelineClient {
   private ConnectionConfigurator connConfigurator;
   private DelegationTokenAuthenticator authenticator;
   private DelegationTokenAuthenticatedURL.Token token;
-  private URI resURI;
   private UserGroupInformation authUgi;
   private String doAsUser;
   private Configuration configuration;
@@ -114,10 +129,20 @@ public class TimelineClientImpl extends TimelineClient {
   private TimelineWriter timelineWriter;
   private SSLFactory sslFactory;
 
+  private volatile String timelineServiceAddress;
+
+  // Retry parameters for identifying new timeline service
+  // TODO consider to merge with connection retry
+  private int maxServiceRetries;
+  private long serviceRetryInterval;
+  private boolean timelineServiceV2 = false;
+
   @Private
   @VisibleForTesting
   TimelineClientConnectionRetry connectionRetry;
 
+  private TimelineEntityDispatcher entityDispatcher;
+
   // Abstract class for an operation that should be retried by timeline client
   @Private
   @VisibleForTesting
@@ -256,7 +281,12 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   public TimelineClientImpl() {
-    super(TimelineClientImpl.class.getName());
+    super(TimelineClientImpl.class.getName(), null);
+  }
+
+  public TimelineClientImpl(ApplicationId applicationId) {
+    super(TimelineClientImpl.class.getName(), applicationId);
+    this.timelineServiceV2 = true;
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -285,31 +315,47 @@ public class TimelineClientImpl extends TimelineClient {
     client = new Client(new URLConnectionClientHandler(
         new TimelineURLConnectionFactory()), cc);
     TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
-    client.addFilter(retryFilter);
-
-    if (YarnConfiguration.useHttps(conf)) {
-      resURI = URI
-          .create(JOINER.join("https://";, conf.get(
-              YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
-              RESOURCE_URI_STR));
+    // TODO need to cleanup filter retry later.
+    if (!timelineServiceV2) {
+      client.addFilter(retryFilter);
+    }
+
+    // old version timeline service need to get address from configuration
+    // while new version need to auto discovery (with retry).
+    if (timelineServiceV2) {
+      maxServiceRetries = conf.getInt(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      serviceRetryInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+      entityDispatcher = new TimelineEntityDispatcher(conf);
     } else {
-      resURI = URI.create(JOINER.join("http://";, conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
-          RESOURCE_URI_STR));
-    }
-    LOG.info("Timeline service address: " + resURI);
-    timelineServiceVersion =
-        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+      if (YarnConfiguration.useHttps(conf)) {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
+      } else {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
+      }
+      timelineServiceVersion =
+          conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+      LOG.info("Timeline service address: " + getTimelineServiceAddress());
+    }
     super.serviceInit(conf);
   }
 
   @Override
   protected void serviceStart() throws Exception {
-    timelineWriter = createTimelineWriter(
-        configuration, authUgi, client, resURI);
+    if (timelineServiceV2) {
+      entityDispatcher.start();
+    } else {
+      timelineWriter = createTimelineWriter(configuration, authUgi, client,
+          constructResURI(getConfig(), timelineServiceAddress, false));
+    }
   }
 
   protected TimelineWriter createTimelineWriter(Configuration conf,
@@ -331,6 +377,9 @@ public class TimelineClientImpl extends TimelineClient {
     if (this.sslFactory != null) {
       this.sslFactory.destroy();
     }
+    if (timelineServiceV2) {
+      entityDispatcher.stop();
+    }
     super.serviceStop();
   }
 
@@ -347,6 +396,25 @@ public class TimelineClientImpl extends TimelineClient {
     return timelineWriter.putEntities(entities);
   }
 
+  @Override
+  public void putEntities(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
+          entities) throws IOException, YarnException {
+    if (!timelineServiceV2) {
+      throw new YarnException("v.2 method is invoked on a v.1.x client");
+    }
+    entityDispatcher.dispatchEntities(true, entities);
+  }
+
+  @Override
+  public void putEntitiesAsync(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
+          entities) throws IOException, YarnException {
+    if (!timelineServiceV2) {
+      throw new YarnException("v.2 method is invoked on a v.1.x client");
+    }
+    entityDispatcher.dispatchEntities(false, entities);
+  }
 
   @Override
   public void putDomain(TimelineDomain domain) throws IOException,
@@ -354,11 +422,110 @@ public class TimelineClientImpl extends TimelineClient {
     timelineWriter.putDomain(domain);
   }
 
+  // Used for new timeline service only
+  @Private
+  protected void putObjects(String path, MultivaluedMap<String, String> params,
+      Object obj) throws IOException, YarnException {
+
+    int retries = verifyRestEndPointAvailable();
+
+    // timelineServiceAddress could be stale, add retry logic here.
+    boolean needRetry = true;
+    while (needRetry) {
+      try {
+        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
+        putObjects(uri, path, params, obj);
+        needRetry = false;
+      } catch (IOException e) {
+        // handle exception for timelineServiceAddress being updated.
+        checkRetryWithSleep(retries, e);
+        retries--;
+      }
+    }
+  }
+
+  private int verifyRestEndPointAvailable() throws YarnException {
+    // timelineServiceAddress could haven't be initialized yet
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    if (timelineServiceAddress == null) {
+      String errMessage = "TimelineClient has reached to max retry times : "
+          + this.maxServiceRetries
+          + ", but failed to fetch timeline service address. Please verify"
+          + " Timeline Auxillary Service is configured in all the NMs";
+      LOG.error(errMessage);
+      throw new YarnException(errMessage);
+    }
+    return retries;
+  }
+
+  /**
+   * Check if reaching to maximum of retries.
+   * @param retries
+   * @param e
+   */
+  private void checkRetryWithSleep(int retries, IOException e)
+      throws YarnException, IOException {
+    if (retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while retrying to connect to 
ATS");
+      }
+    } else {
+      StringBuilder msg =
+          new StringBuilder("TimelineClient has reached to max retry times : 
");
+      msg.append(this.maxServiceRetries);
+      msg.append(" for service address: ");
+      msg.append(timelineServiceAddress);
+      LOG.error(msg.toString());
+      throw new IOException(msg.toString(), e);
+    }
+  }
+
+  protected void putObjects(
+      URI base, String path, MultivaluedMap<String, String> params, Object obj)
+          throws IOException, YarnException {
+    ClientResponse resp;
+    try {
+      resp = client.resource(base).path(path).queryParams(params)
+          .accept(MediaType.APPLICATION_JSON)
+          .type(MediaType.APPLICATION_JSON)
+          .put(ClientResponse.class, obj);
+    } catch (RuntimeException re) {
+      // runtime exception is expected if the client cannot connect the server
+      String msg =
+          "Failed to get the response from the timeline server.";
+      LOG.error(msg, re);
+      throw new IOException(re);
+    }
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg = "Response from the timeline server is " +
+          ((resp == null) ? "null":
+          "not successful," + " HTTP error code: " + resp.getStatus()
+          + ", Server response:\n" + resp.getEntity(String.class));
+      LOG.error(msg);
+      throw new YarnException(msg);
+    }
+  }
+
+  @Override
+  public void setTimelineServiceAddress(String address) {
+    this.timelineServiceAddress = address;
+  }
+
+  private String getTimelineServiceAddress() {
+    return this.timelineServiceAddress;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
       final String renewer) throws IOException, YarnException {
-    PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> 
getDTAction =
+    PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
+        getDTAction =
         new 
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
 
           @Override
@@ -367,8 +534,12 @@ public class TimelineClientImpl extends TimelineClient {
             DelegationTokenAuthenticatedURL authUrl =
                 new DelegationTokenAuthenticatedURL(authenticator,
                     connConfigurator);
+            // TODO we should add retry logic here if timelineServiceAddress is
+            // not available immediately.
             return (Token) authUrl.getDelegationToken(
-                resURI.toURL(), token, renewer, doAsUser);
+                constructResURI(getConfig(),
+                    getTimelineServiceAddress(), false).toURL(),
+                token, renewer, doAsUser);
           }
         };
     return (Token<TimelineDelegationTokenIdentifier>) 
operateDelegationToken(getDTAction);
@@ -391,8 +562,8 @@ public class TimelineClientImpl extends TimelineClient {
           @Override
           public Long run() throws Exception {
             // If the timeline DT to renew is different than cached, replace 
it.
-            // Token to set every time for retry, because when exception 
happens,
-            // DelegationTokenAuthenticatedURL will reset it to null;
+            // Token to set every time for retry, because when exception
+            // happens, DelegationTokenAuthenticatedURL will reset it to null;
             if (!timelineDT.equals(token.getDelegationToken())) {
               token.setDelegationToken((Token) timelineDT);
             }
@@ -401,9 +572,10 @@ public class TimelineClientImpl extends TimelineClient {
                     connConfigurator);
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ? resURI
+            final URI serviceURI = isTokenServiceAddrEmpty ?
+                constructResURI(getConfig(), getTimelineServiceAddress(), 
false)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR, null, null);
+                address.getPort(), RESOURCE_URI_STR_V1, null, null);
             return authUrl
                 .renewDelegationToken(serviceURI.toURL(), token, doAsUser);
           }
@@ -427,9 +599,10 @@ public class TimelineClientImpl extends TimelineClient {
 
           @Override
           public Void run() throws Exception {
-            // If the timeline DT to cancel is different than cached, replace 
it.
-            // Token to set every time for retry, because when exception 
happens,
-            // DelegationTokenAuthenticatedURL will reset it to null;
+            // If the timeline DT to cancel is different than cached, replace
+            // it.
+            // Token to set every time for retry, because when exception
+            // happens, DelegationTokenAuthenticatedURL will reset it to null;
             if (!timelineDT.equals(token.getDelegationToken())) {
               token.setDelegationToken((Token) timelineDT);
             }
@@ -438,9 +611,10 @@ public class TimelineClientImpl extends TimelineClient {
                     connConfigurator);
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ? resURI
+            final URI serviceURI = isTokenServiceAddrEmpty ?
+                constructResURI(getConfig(), getTimelineServiceAddress(), 
false)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR, null, null);
+                address.getPort(), RESOURCE_URI_STR_V1, null, null);
             authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
             return null;
           }
@@ -450,7 +624,8 @@ public class TimelineClientImpl extends TimelineClient {
 
   @Override
   public String toString() {
-    return super.toString() + " with timeline server " + resURI
+    return super.toString() + " with timeline server "
+        + constructResURI(getConfig(), getTimelineServiceAddress(), false)
         + " and writer " + timelineWriter;
   }
 
@@ -464,6 +639,26 @@ public class TimelineClientImpl extends TimelineClient {
     return connectionRetry.retryOn(tokenRetryOp);
   }
 
+  /**
+   * Poll TimelineServiceAddress for maximum of retries times if it is null.
+   *
+   * @param retries
+   * @return the left retry times
+   * @throws IOException
+   */
+  private int pollTimelineServiceAddress(int retries) throws YarnException {
+    while (timelineServiceAddress == null && retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while trying to connect ATS");
+      }
+      retries--;
+    }
+    return retries;
+  }
+
   private class TimelineURLConnectionFactory
       implements HttpURLConnectionFactory {
 
@@ -533,6 +728,13 @@ public class TimelineClientImpl extends TimelineClient {
     connection.setReadTimeout(socketTimeout);
   }
 
+  private static URI constructResURI(
+      Configuration conf, String address, boolean v2) {
+    return URI.create(
+        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://"; : "http://";,
+            address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
+  }
+
   public static void main(String[] argv) throws Exception {
     CommandLine cliParser = new GnuParser().parse(opts, argv);
     if (cliParser.hasOption("put")) {
@@ -714,4 +916,220 @@ public class TimelineClientImpl extends TimelineClient {
     }
   }
 
+  private final class EntitiesHolder extends FutureTask<Void> {
+    private final
+        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+            entities;
+    private final boolean isSync;
+
+    EntitiesHolder(
+        final
+            org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+                entities,
+        final boolean isSync) {
+      super(new Callable<Void>() {
+        // publishEntities()
+        public Void call() throws Exception {
+          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+          params.add("appid", getContextAppId().toString());
+          params.add("async", Boolean.toString(!isSync));
+          putObjects("entities", params, entities);
+          return null;
+        }
+      });
+      this.entities = entities;
+      this.isSync = isSync;
+    }
+
+    public boolean isSync() {
+      return isSync;
+    }
+
+    public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+        getEntities() {
+      return entities;
+    }
+  }
+
+  /**
+   * This class is responsible for collecting the timeline entities and
+   * publishing them in async.
+   */
+  private class TimelineEntityDispatcher {
+    /**
+     * Time period for which the timelineclient will wait for draining after
+     * stop.
+     */
+    private static final long DRAIN_TIME_PERIOD = 2000L;
+
+    private int numberOfAsyncsToMerge;
+    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+    private ExecutorService executor;
+
+    TimelineEntityDispatcher(Configuration conf) {
+      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+      numberOfAsyncsToMerge =
+          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+    }
+
+    Runnable createRunnable() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            EntitiesHolder entitiesHolder;
+            while (!Thread.currentThread().isInterrupted()) {
+              // Merge all the async calls and make one push, but if its sync
+              // call push immediately
+              try {
+                entitiesHolder = timelineEntityQueue.take();
+              } catch (InterruptedException ie) {
+                LOG.info("Timeline dispatcher thread was interrupted ");
+                Thread.currentThread().interrupt();
+                return;
+              }
+              if (entitiesHolder != null) {
+                publishWithoutBlockingOnQueue(entitiesHolder);
+              }
+            }
+          } finally {
+            if (!timelineEntityQueue.isEmpty()) {
+              LOG.info("Yet to publish " + timelineEntityQueue.size()
+                  + " timelineEntities, draining them now. ");
+            }
+            // Try to drain the remaining entities to be published @ the max 
for
+            // 2 seconds
+            long timeTillweDrain =
+                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+            while (!timelineEntityQueue.isEmpty()) {
+              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+              if (System.currentTimeMillis() > timeTillweDrain) {
+                // time elapsed stop publishing further....
+                if (!timelineEntityQueue.isEmpty()) {
+                  LOG.warn("Time to drain elapsed! Remaining "
+                      + timelineEntityQueue.size() + "timelineEntities will 
not"
+                      + " be published");
+                  // if some entities were not drained then we need interrupt
+                  // the threads which had put sync EntityHolders to the queue.
+                  EntitiesHolder nextEntityInTheQueue = null;
+                  while ((nextEntityInTheQueue =
+                      timelineEntityQueue.poll()) != null) {
+                    nextEntityInTheQueue.cancel(true);
+                  }
+                }
+                break;
+              }
+            }
+          }
+        }
+
+        /**
+         * Publishes the given EntitiesHolder and return immediately if sync
+         * call, else tries to fetch the EntitiesHolder from the queue in non
+         * blocking fashion and collate the Entities if possible before
+         * publishing through REST.
+         *
+         * @param entitiesHolder
+         */
+        private void publishWithoutBlockingOnQueue(
+            EntitiesHolder entitiesHolder) {
+          if (entitiesHolder.isSync()) {
+            entitiesHolder.run();
+            return;
+          }
+          int count = 1;
+          while (true) {
+            // loop till we find a sync put Entities or there is nothing
+            // to take
+            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+            if (nextEntityInTheQueue == null) {
+              // Nothing in the queue just publish and get back to the
+              // blocked wait state
+              entitiesHolder.run();
+              break;
+            } else if (nextEntityInTheQueue.isSync()) {
+              // flush all the prev async entities first
+              entitiesHolder.run();
+              // and then flush the sync entity
+              nextEntityInTheQueue.run();
+              break;
+            } else {
+              // append all async entities together and then flush
+              entitiesHolder.getEntities().addEntities(
+                  nextEntityInTheQueue.getEntities().getEntities());
+              count++;
+              if (count == numberOfAsyncsToMerge) {
+                // Flush the entities if the number of the async
+                // putEntites merged reaches the desired limit. To avoid
+                // collecting multiple entities and delaying for a long
+                // time.
+                entitiesHolder.run();
+                break;
+              }
+            }
+          }
+        }
+      };
+    }
+
+    public void dispatchEntities(boolean sync,
+        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
+            entitiesTobePublished) throws YarnException {
+      if (executor.isShutdown()) {
+        throw new YarnException("Timeline client is in the process of 
stopping,"
+            + " not accepting any more TimelineEntities");
+      }
+
+      // wrap all TimelineEntity into TimelineEntities object
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+          entities =
+              new org.apache.hadoop.yarn.api.records.timelineservice.
+                  TimelineEntities();
+      for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+               entity : entitiesTobePublished) {
+        entities.addEntity(entity);
+      }
+
+      // created a holder and place it in queue
+      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+      try {
+        timelineEntityQueue.put(entitiesHolder);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException(
+            "Failed while adding entity to the queue for publishing", e);
+      }
+
+      if (sync) {
+        // In sync call we need to wait till its published and if any error 
then
+        // throw it back
+        try {
+          entitiesHolder.get();
+        } catch (ExecutionException e) {
+          throw new YarnException("Failed while publishing entity",
+              e.getCause());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new YarnException("Interrupted while publishing entity", e);
+        }
+      }
+    }
+
+    public void start() {
+      executor = Executors.newSingleThreadExecutor();
+      executor.execute(createRunnable());
+    }
+
+    public void stop() {
+      LOG.info("Stopping TimelineClient.");
+      executor.shutdownNow();
+      try {
+        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        e.printStackTrace();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index a794e97..b618ac1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -43,6 +44,13 @@ import org.codehaus.jackson.map.ObjectMapper;
 @Evolving
 public class TimelineUtils {
 
+  public static final String FLOW_NAME_TAG_PREFIX = "TIMELINE_FLOW_NAME_TAG";
+  public static final String FLOW_VERSION_TAG_PREFIX =
+      "TIMELINE_FLOW_VERSION_TAG";
+  public static final String FLOW_RUN_ID_TAG_PREFIX =
+      "TIMELINE_FLOW_RUN_ID_TAG";
+  public final static String DEFAULT_FLOW_VERSION = "1";
+
   private static ObjectMapper mapper;
 
   static {
@@ -154,4 +162,45 @@ public class TimelineUtils {
         getTimelineTokenServiceAddress(conf);
     return SecurityUtil.buildTokenService(timelineServiceAddr);
   }
+
+  public static String generateDefaultFlowName(String appName,
+      ApplicationId appId) {
+    return (appName != null &&
+        !appName.equals(YarnConfiguration.DEFAULT_APPLICATION_NAME)) ?
+        appName :
+        "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
+  }
+
+  /**
+   * Generate flow name tag.
+   *
+   * @param flowName flow name that identifies a distinct flow application 
which
+   *                 can be run repeatedly over time
+   * @return flow name tag.
+   */
+  public static String generateFlowNameTag(String flowName) {
+    return FLOW_NAME_TAG_PREFIX + ":" + flowName;
+  }
+
+  /**
+   * Generate flow version tag.
+   *
+   * @param flowVersion flow version that keeps track of the changes made to 
the
+   *                    flow
+   * @return flow version tag.
+   */
+  public static String generateFlowVersionTag(String flowVersion) {
+    return FLOW_VERSION_TAG_PREFIX + ":" + flowVersion;
+  }
+
+  /**
+   * Generate flow run ID tag.
+   *
+   * @param flowRunId flow run ID that identifies one instance (or specific
+   *                  execution) of that flow
+   * @return flow run id tag.
+   */
+  public static String generateFlowRunIdTag(long flowRunId) {
+    return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
index c376b32..89f0551 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
@@ -212,7 +212,7 @@ public class WebAppUtils {
     return getResolvedAddress(address);
   }
 
-  private static String getResolvedAddress(InetSocketAddress address) {
+  public static String getResolvedAddress(InetSocketAddress address) {
     address = NetUtils.getConnectAddress(address);
     StringBuilder sb = new StringBuilder();
     InetAddress resolved = address.getAddress();
@@ -274,6 +274,10 @@ public class WebAppUtils {
   }
 
   public static String getAHSWebAppURLWithoutScheme(Configuration conf) {
+    return getTimelineReaderWebAppURL(conf);
+  }
+
+  public static String getTimelineReaderWebAppURL(Configuration conf) {
     if (YarnConfiguration.useHttps(conf)) {
       return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
@@ -282,7 +286,7 @@ public class WebAppUtils {
         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
     }
   }
-  
+
   /**
    * if url has scheme then it will be returned as it is else it will return
    * url with scheme.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 59597f6..c1ef52a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -784,12 +784,30 @@
 
   <property>
     <description>The setting that controls whether yarn system metrics is
-    published on the timeline server or not by RM.</description>
+    published to the Timeline server (version one) or not, by RM.
+    This configuration is now deprecated in favor of
+    yarn.system-metrics-publisher.enabled.</description>
     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
 
   <property>
+    <description>The setting that controls whether yarn system metrics is
+    published on the Timeline service or not by RM And NM.</description>
+    <name>yarn.system-metrics-publisher.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>The setting that controls whether yarn container events are
+    published to the timeline service or not by RM. This configuration setting
+    is for ATS V2.</description>
+    <name>yarn.rm.system-metrics-publisher.emit-container-events</name>
+    <value>false</value>
+  </property>
+
+
+  <property>
     <description>Number of worker threads that send the yarn system metrics
     data.</description>
     
<name>yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size</name>
@@ -966,6 +984,12 @@
     <value>20</value>
   </property>
 
+    <property>
+    <description>Number of threads collector service uses.</description>
+    <name>yarn.nodemanager.collector-service.thread-count</name>
+    <value>5</value>
+  </property>
+
   <property>
     <description>Number of threads used in cleanup.</description>
     <name>yarn.nodemanager.delete.thread-count</name>
@@ -1041,6 +1065,13 @@
     <value>${yarn.nodemanager.hostname}:8040</value>
   </property>
 
+
+  <property>
+    <description>Address where the collector service IPC is.</description>
+    <name>yarn.nodemanager.collector-service.address</name>
+    <value>${yarn.nodemanager.hostname}:8048</value>
+  </property>
+
   <property>
     <description>Interval in between cache cleanups.</description>
     <name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
@@ -1858,7 +1889,7 @@
     <description>Indicate what is the current version of the running
     timeline service. For example, if "yarn.timeline-service.version" is 1.5,
     and "yarn.timeline-service.enabled" is true, it means the cluster will and
-    should bring up the timeline service v.1.5.
+    should bring up the timeline service v.1.5 (and nothing else).
     On the client side, if the client uses the same version of timeline 
service,
     it should succeed. If the client chooses to use a smaller version in spite 
of this,
     then depending on how robust the compatibility story is between versions,
@@ -2177,6 +2208,23 @@
     <value>300</value>
   </property>
 
+  <!-- Timeline Service v2 Configuration -->
+  <property>
+    <name>yarn.timeline-service.writer.class</name>
+    <description>
+      Storage implementation ATS v2 will use for the TimelineWriter service.
+    </description>
+    
<value>org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.reader.class</name>
+    <description>
+      Storage implementation ATS v2 will use for the TimelineReader service.
+    </description>
+    
<value>org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl</value>
+  </property>
+
   <property>
     <name>yarn.timeline-service.client.internal-timers-ttl-secs</name>
     <description>
@@ -2187,6 +2235,41 @@
     <value>420</value>
   </property>
 
+  <property>
+    <description>The setting that controls how often the timeline collector
+    flushes the timeline writer.</description>
+    <name>yarn.timeline-service.writer.flush-interval-seconds</name>
+    <value>60</value>
+  </property>
+
+  <property>
+    <description>Time period till which the application collector will be alive
+     in NM, after the  application master container finishes.</description>
+    <name>yarn.timeline-service.app-collector.linger-period.ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>Time line V2 client tries to merge these many number of
+    async entities (if available) and then call the REST ATS V2 API to submit.
+    </description>
+    
<name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
+    The setting that controls how long the final value
+    of a metric of a completed app is retained before merging into
+    the flow sum. Up to this time after an application is completed
+    out-of-order values that arrive can be recognized and discarded at the
+    cost of increased storage.
+    </description>
+    
<name>yarn.timeline-service.hbase.coprocessor.app-final-value-retention-milliseconds
+    </name>
+    <value>259200000</value>
+  </property>
+
   <!--  Shared Cache Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index dfb862e..90c7573 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
@@ -116,7 +117,7 @@ public class TestContainerLaunchRPC {
             resource, System.currentTimeMillis() + 10000, 42, 42,
             Priority.newInstance(0), 0);
       Token containerToken =
-          TestRPC.newContainerToken(nodeId, "password".getBytes(),
+          newContainerToken(nodeId, "password".getBytes(),
             containerTokenIdentifier);
 
       StartContainerRequest scRequest =
@@ -142,6 +143,19 @@ public class TestContainerLaunchRPC {
     Assert.fail("timeout exception should have occurred!");
   }
 
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
   public class DummyContainerManager implements ContainerManagementProtocol {
 
     private ContainerStatus status = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
index 99d42ea..f97f7c7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
@@ -105,7 +106,7 @@ public class TestContainerResourceIncreaseRPC {
               resource, System.currentTimeMillis() + 10000, 42, 42,
                   Priority.newInstance(0), 0);
       Token containerToken =
-          TestRPC.newContainerToken(nodeId, "password".getBytes(),
+          newContainerToken(nodeId, "password".getBytes(),
               containerTokenIdentifier);
       // Construct container resource increase request,
       List<Token> increaseTokens = new ArrayList<>();
@@ -128,6 +129,19 @@ public class TestContainerResourceIncreaseRPC {
     Assert.fail("timeout exception should have occurred!");
   }
 
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
   public class DummyContainerManager implements ContainerManagementProtocol {
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to