http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index bc046c7..3121c4e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -26,6 +28,10 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record successful completion of a map attempt
@@ -33,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapAttemptFinishedEvent  implements HistoryEvent {
+public class MapAttemptFinishedEvent implements HistoryEvent {
 
   private MapAttemptFinished datum = null;
 
@@ -218,4 +224,28 @@ public class MapAttemptFinishedEvent  implements 
HistoryEvent {
     return physMemKbytes;
   }
   
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> metrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return metrics;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
index eead9cf..7adae23 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record the normalized map/reduce requirements.
@@ -71,4 +76,18 @@ public class NormalizedResourceEvent implements HistoryEvent 
{
   public void setDatum(Object datum) {
     throw new UnsupportedOperationException("Not a seriable object");
   }
+
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("MEMORY", "" + getMemory());
+    tEvent.addInfo("TASK_TYPE", getTaskType());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    return null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 6644a48..9c0f09b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -26,6 +28,10 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record successful completion of a reduce attempt
@@ -33,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ReduceAttemptFinishedEvent  implements HistoryEvent {
+public class ReduceAttemptFinishedEvent implements HistoryEvent {
 
   private ReduceAttemptFinished datum = null;
 
@@ -223,4 +229,29 @@ public class ReduceAttemptFinishedEvent  implements 
HistoryEvent {
     return physMemKbytes;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> metrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return metrics;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index bb7dbe0..a931ca2 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -25,6 +27,10 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record successful task completion
@@ -136,4 +142,24 @@ public class TaskAttemptFinishedEvent  implements 
HistoryEvent {
            : EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> metrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
index 3073d5b..d09d5ca 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
@@ -18,16 +18,20 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record start of a task attempt
  *
@@ -133,4 +137,25 @@ public class TaskAttemptStartedEvent implements 
HistoryEvent {
     return null;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID",
+        getTaskAttemptId().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("HTTP_PORT", getHttpPort());
+    tEvent.addInfo("TRACKER_NAME", getTrackerName());
+    tEvent.addInfo("SHUFFLE_PORT", getShufflePort());
+    tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+        "" : getContainerId().toString());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 77ee2a0..1732d91 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -18,17 +18,21 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-
-import org.apache.hadoop.mapred.ProgressSplitsBlock;
-
-import org.apache.avro.util.Utf8;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record unsuccessful (Killed/Failed) completion of task attempts
@@ -248,4 +252,29 @@ public class TaskAttemptUnsuccessfulCompletionEvent 
implements HistoryEvent {
     return physMemKbytes;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ?
+        "" : getTaskAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> metrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index 2838f08..d14350d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -18,14 +18,20 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-
-import org.apache.avro.util.Utf8;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record the failure of a task
@@ -137,4 +143,23 @@ public class TaskFailedEvent implements HistoryEvent {
     return EventType.TASK_FAILED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("FAILED_ATTEMPT_ID",
+        getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> metrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index d4ec74d..0bc4383 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -18,13 +18,20 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record the successful completion of a task
@@ -115,5 +122,23 @@ public class TaskFinishedEvent implements HistoryEvent {
     return EventType.TASK_FINISHED;
   }
 
-  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+    tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+        getSuccessfulTaskAttemptId() == null ? "" :
+            getSuccessfulTaskAttemptId().toString());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    Set<TimelineMetric> jobMetrics = JobHistoryEventUtils
+        .countersToTimelineMetric(getCounters(), finishTime);
+    return jobMetrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
index ed53b03..9d2fc0e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.Set;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record the start of a task
@@ -72,4 +77,19 @@ public class TaskStartedEvent implements HistoryEvent {
     return EventType.TASK_STARTED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
index 58f4143..010129d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
@@ -18,13 +18,15 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
+import java.util.Set;
 
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
-
-import org.apache.avro.util.Utf8;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 
 /**
  * Event to record updates to a task
@@ -61,4 +63,17 @@ public class TaskUpdatedEvent implements HistoryEvent {
     return EventType.TASK_UPDATED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    return tEvent;
+  }
+
+  @Override
+  public Set<TimelineMetric> getTimelineMetrics() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
new file mode 100644
index 0000000..89abf0e
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
@@ -0,0 +1,83 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+
+/**
+ * Class containing utility methods to be used by JobHistoryEventHandler.
+ */
+public final class JobHistoryEventUtils {
+  private JobHistoryEventUtils() {
+  }
+
+  // Number of bytes of config which can be published in one shot to ATSv2.
+  public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
+
+  public static JsonNode countersToJSON(Counters counters) {
+    ObjectMapper mapper = new ObjectMapper();
+    ArrayNode nodes = mapper.createArrayNode();
+    if (counters != null) {
+      for (CounterGroup counterGroup : counters) {
+        ObjectNode groupNode = nodes.addObject();
+        groupNode.put("NAME", counterGroup.getName());
+        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+        ArrayNode countersNode = groupNode.putArray("COUNTERS");
+        for (Counter counter : counterGroup) {
+          ObjectNode counterNode = countersNode.addObject();
+          counterNode.put("NAME", counter.getName());
+          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+          counterNode.put("VALUE", counter.getValue());
+        }
+      }
+    }
+    return nodes;
+  }
+
+  public static Set<TimelineMetric> countersToTimelineMetric(Counters counters,
+      long timestamp) {
+    return countersToTimelineMetric(counters, timestamp, "");
+  }
+
+  public static Set<TimelineMetric> countersToTimelineMetric(Counters counters,
+      long timestamp, String groupNamePrefix) {
+    Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
+    for (CounterGroup g : counters) {
+      String groupName = g.getName();
+      for (Counter c : g) {
+        String name = groupNamePrefix + groupName + ":" + c.getName();
+        TimelineMetric metric = new TimelineMetric();
+        metric.setId(name);
+        metric.addValue(timestamp, c.getValue());
+        entityMetrics.add(metric);
+      }
+    }
+    return entityMetrics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
index 263a3e7..0e87d3b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
@@ -100,6 +100,12 @@
       <type>test-jar</type>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>org.hsqldb</groupId>
       <artifactId>hsqldb</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index 40ed9ad..90748a9 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -18,23 +18,54 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TestMRTimelineEventHandling {
 
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  private static final Log LOG =
+      LogFactory.getLog(TestMRTimelineEventHandling.class);
+
   @Test
   public void testTimelineServiceStartInMiniCluster() throws Exception {
     Configuration conf = new YarnConfiguration();
@@ -48,7 +79,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
 
@@ -89,7 +120,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
       conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
@@ -138,6 +169,280 @@ public class TestMRTimelineEventHandling {
     }
   }
 
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testMRNewTimelineServiceEventHandling() throws Exception {
+    LOG.info("testMRNewTimelineServiceEventHandling start.");
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // enable new timeline service
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+
+    // enable aux-service based timeline collectors
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+    conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + 
TIMELINE_AUX_SERVICE_NAME
+        + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
+
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestMRTimelineEventHandling.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      LOG.info("A MiniMRYarnCluster get start.");
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      LOG.info("Run 1st job which should be successful.");
+      JobConf successConf = new JobConf(conf);
+      successConf.set("dummy_conf1",
+          UtilsForTests.createConfigValue(51 * 1024));
+      successConf.set("dummy_conf2",
+          UtilsForTests.createConfigValue(51 * 1024));
+      successConf.set("huge_dummy_conf1",
+          UtilsForTests.createConfigValue(101 * 1024));
+      successConf.set("huge_dummy_conf2",
+          UtilsForTests.createConfigValue(101 * 1024));
+      RunningJob job =
+          UtilsForTests.runJobSucceed(successConf, inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(new Configuration(cluster.getConfig()));
+      yarnClient.start();
+      EnumSet<YarnApplicationState> appStates =
+          EnumSet.allOf(YarnApplicationState.class);
+
+      ApplicationId firstAppId = null;
+      List<ApplicationReport> apps = yarnClient.getApplications(appStates);
+      Assert.assertEquals(apps.size(), 1);
+      ApplicationReport appReport = apps.get(0);
+      firstAppId = appReport.getApplicationId();
+      UtilsForTests.waitForAppFinished(job, cluster);
+      checkNewTimelineEvent(firstAppId, appReport);
+
+      LOG.info("Run 2nd job which should be failed.");
+      job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.FAILED,
+          job.getJobStatus().getState().getValue());
+
+      apps = yarnClient.getApplications(appStates);
+      Assert.assertEquals(apps.size(), 2);
+
+      appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
+          apps.get(0) : apps.get(1);
+
+      checkNewTimelineEvent(firstAppId, appReport);
+
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+      // Cleanup test file
+      String testRoot =
+          FileSystemTimelineWriterImpl.
+              DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+      File testRootFolder = new File(testRoot);
+      if(testRootFolder.isDirectory()) {
+        FileUtils.deleteDirectory(testRootFolder);
+      }
+
+    }
+  }
+
+  private void checkNewTimelineEvent(ApplicationId appId,
+      ApplicationReport appReport) throws IOException {
+    String tmpRoot =
+        FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+            + "/entities/";
+
+    File tmpRootFolder = new File(tmpRoot);
+
+    Assert.assertTrue(tmpRootFolder.isDirectory());
+    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
+        "/" + UserGroupInformation.getCurrentUser().getShortUserName() +
+        "/" + appReport.getName() +
+        "/" + TimelineUtils.DEFAULT_FLOW_VERSION +
+        "/" + appReport.getStartTime() +
+        "/" + appId.toString();
+    // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
+    String outputDirJob = basePath + "/MAPREDUCE_JOB/";
+
+    File entityFolder = new File(outputDirJob);
+    Assert.assertTrue("Job output directory: " + outputDirJob +
+        " does not exist.",
+        entityFolder.isDirectory());
+
+    // check for job event file
+    String jobEventFileName = appId.toString().replaceAll("application", "job")
+        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String jobEventFilePath = outputDirJob + jobEventFileName;
+    File jobEventFile = new File(jobEventFilePath);
+    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
+        " does not exist.",
+        jobEventFile.exists());
+    verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
+        true, false, null);
+    Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
+        "huge_dummy_conf1", "huge_dummy_conf2");
+    verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
+
+    // for this test, we expect MR job metrics are published in 
YARN_APPLICATION
+    String outputAppDir = basePath + "/YARN_APPLICATION/";
+    entityFolder = new File(outputAppDir);
+    Assert.assertTrue(
+        "Job output directory: " + outputAppDir +
+        " does not exist.",
+        entityFolder.isDirectory());
+
+    // check for job event file
+    String appEventFileName = appId.toString()
+        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String appEventFilePath = outputAppDir + appEventFileName;
+    File appEventFile = new File(appEventFilePath);
+    Assert.assertTrue(
+        "appEventFilePath: " + appEventFilePath +
+        " does not exist.",
+        appEventFile.exists());
+    verifyEntity(appEventFile, null, true, false, null);
+    verifyEntity(appEventFile, null, false, true, cfgsToCheck);
+
+    // check for task event file
+    String outputDirTask = basePath + "/MAPREDUCE_TASK/";
+    File taskFolder = new File(outputDirTask);
+    Assert.assertTrue("Task output directory: " + outputDirTask +
+        " does not exist.",
+        taskFolder.isDirectory());
+
+    String taskEventFileName =
+        appId.toString().replaceAll("application", "task") +
+        "_m_000000" +
+        FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String taskEventFilePath = outputDirTask + taskEventFileName;
+    File taskEventFile = new File(taskEventFilePath);
+    Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
+        " does not exist.",
+        taskEventFile.exists());
+    verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
+        true, false, null);
+
+    // check for task attempt event file
+    String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
+    File taskAttemptFolder = new File(outputDirTaskAttempt);
+    Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
+        " does not exist.", taskAttemptFolder.isDirectory());
+
+    String taskAttemptEventFileName = appId.toString().replaceAll(
+        "application", "attempt") + "_m_000000_0" +
+        FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String taskAttemptEventFilePath = outputDirTaskAttempt +
+        taskAttemptEventFileName;
+    File taskAttemptEventFile = new File(taskAttemptEventFilePath);
+    Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
+        " does not exist.", taskAttemptEventFile.exists());
+    verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
+        true, false, null);
+  }
+
+  /**
+   * Verifies entity by reading the entity file written via FS impl.
+   * @param entityFile File to be read.
+   * @param eventId Event to be checked.
+   * @param chkMetrics If event is not null, this flag determines if metrics
+   *     exist when the event is encountered. If event is null, we merely check
+   *     if metrics exist in the entity file.
+   * @param chkCfg If event is not null, this flag determines if configs
+   *     exist when the event is encountered. If event is null, we merely check
+   *     if configs exist in the entity file.
+   * @param cfgsToVerify a set of configs which should exist in the entity 
file.
+   * @throws IOException
+   */
+  private void verifyEntity(File entityFile, String eventId,
+      boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
+      throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().length() > 0) {
+          org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+              entity =
+                  FileSystemTimelineReaderImpl.getTimelineRecordFromJSON(
+                      strLine.trim(),
+                      org.apache.hadoop.yarn.api.records.timelineservice.
+                          TimelineEntity.class);
+          if (eventId == null) {
+            // Job metrics are published without any events for
+            // ApplicationEntity. There is also possibility that some other
+            // ApplicationEntity is published without events, hence loop till
+            // its found. Same applies to configs.
+            if (chkMetrics && entity.getMetrics().size() > 0) {
+              return;
+            }
+            if (chkCfg && entity.getConfigs().size() > 0) {
+              if (cfgsToVerify == null) {
+                return;
+              } else {
+                // Have configs to verify. Keep on removing configs from the 
set
+                // of configs to verify as they are found. When the all the
+                // entities have been looped through, we will check if the set
+                // is empty or not(indicating if all configs have been found or
+                // not).
+                for (Iterator<String> itr =
+                    cfgsToVerify.iterator(); itr.hasNext();) {
+                  String config = itr.next();
+                  if (entity.getConfigs().containsKey(config)) {
+                    itr.remove();
+                  }
+                }
+                // All the required configs have been verified, so return.
+                if (cfgsToVerify.isEmpty()) {
+                  return;
+                }
+              }
+            }
+          } else {
+            for (TimelineEvent event : entity.getEvents()) {
+              if (event.getId().equals(eventId)) {
+                if (chkMetrics) {
+                  assertTrue(entity.getMetrics().size() > 0);
+                }
+                if (chkCfg) {
+                  assertTrue(entity.getConfigs().size() > 0);
+                  if (cfgsToVerify != null) {
+                    for (String cfg : cfgsToVerify) {
+                      assertTrue(entity.getConfigs().containsKey(cfg));
+                    }
+                  }
+                }
+                return;
+              }
+            }
+          }
+        }
+      }
+      if (cfgsToVerify != null) {
+        assertTrue(cfgsToVerify.isEmpty());
+        return;
+      }
+      fail("Expected event : " + eventId + " not found in the file "
+          + entityFile);
+    } finally {
+      reader.close();
+    }
+  }
+
   @Test
   public void testMapreduceJobTimelineServiceEnabled()
       throws Exception {
@@ -147,7 +452,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
       conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
index 972391c..ec6ad38 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
@@ -31,8 +31,10 @@ import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,10 +54,13 @@ import 
org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSeq
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
-
-import org.apache.commons.logging.Log;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import com.google.common.base.Supplier;
 
 /** 
  * Utilities used in unit test.
@@ -150,6 +155,14 @@ public class UtilsForTests {
     return buf.toString();
   }
 
+  public static String createConfigValue(int msgSize) {
+    StringBuilder sb = new StringBuilder(msgSize);
+    for (int i = 0; i < msgSize; i++) {
+      sb.append('a');
+    }
+    return sb.toString();
+  }
+
   public static String safeGetCanonicalPath(File f) {
     try {
       String s = f.getCanonicalPath();
@@ -607,6 +620,29 @@ public class UtilsForTests {
     return job;
   }
 
+  public static void waitForAppFinished(RunningJob job,
+      MiniMRYarnCluster cluster) throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(
+        Long.parseLong(job.getID().getJtIdentifier()), job.getID().getId());
+    ConcurrentMap<ApplicationId, RMApp> rmApps = cluster.getResourceManager()
+        .getRMContext().getRMApps();
+    if (!rmApps.containsKey(appId)) {
+      throw new IOException("Job not found");
+    }
+    final RMApp rmApp = rmApps.get(appId);
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return RMAppImpl.isAppInFinalState(rmApp);
+        }
+      }, 1000, 1000 * 180);
+    } catch (TimeoutException | InterruptedException e1) {
+      throw new IOException("Yarn application with " + appId + " didn't finish 
"
+          + "did not reach finale State", e1);
+    }
+  }
+
   // Run a job that will be succeeded and wait until it completes
   public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
          throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
new file mode 100644
index 0000000..74d7b94
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Base mapper for writing entities to the timeline service. Subclasses
+ * override {@link #writeEntities(Configuration, TimelineCollectorManager,
+ * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
+ * to the timeline service.
+ */
+abstract class EntityWriterV2
+    extends org.apache.hadoop.mapreduce.Mapper
+        <IntWritable, IntWritable, Writable, Writable> {
+  @Override
+  public void map(IntWritable key, IntWritable val, Context context)
+      throws IOException {
+
+    // create the timeline collector manager wired with the writer
+    Configuration tlConf = new YarnConfiguration();
+    TimelineCollectorManager manager = new TimelineCollectorManager("test");
+    manager.init(tlConf);
+    manager.start();
+    try {
+      // invoke the method to have the subclass write entities
+      writeEntities(tlConf, manager, context);
+    } finally {
+      manager.close();
+    }
+  }
+
+  protected abstract void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java
index c290cd6..5d9dc0b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 
+/**
+ * Used to parse job history and configuration files.
+ */
 class JobHistoryFileParser {
   private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
index 5e10662..447ea4e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
@@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
 import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
new file mode 100644
index 0000000..2ec4833
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Mapper for TimelineServicePerformance that replays job history files to the
+ * timeline service v.2.
+ *
+ */
+class JobHistoryFileReplayMapperV2 extends EntityWriterV2 {
+  private static final Log LOG =
+      LogFactory.getLog(JobHistoryFileReplayMapperV2.class);
+
+  @Override
+  protected void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException {
+    JobHistoryFileReplayHelper helper = new 
JobHistoryFileReplayHelper(context);
+    int replayMode = helper.getReplayMode();
+    JobHistoryFileParser parser = helper.getParser();
+    TimelineEntityConverterV2 converter = new TimelineEntityConverterV2();
+
+    // collect the apps it needs to process
+    Collection<JobFiles> jobs = helper.getJobFiles();
+    if (jobs.isEmpty()) {
+      LOG.info(context.getTaskAttemptID().getTaskID() +
+          " will process no jobs");
+    } else {
+      LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
+          jobs.size() + " jobs");
+    }
+    for (JobFiles job: jobs) {
+      // process each job
+      String jobIdStr = job.getJobId();
+      // skip if either of the file is missing
+      if (job.getJobConfFilePath() == null ||
+          job.getJobHistoryFilePath() == null) {
+        LOG.info(jobIdStr + " missing either the job history file or the " +
+            "configuration file. Skipping.");
+        continue;
+      }
+      LOG.info("processing " + jobIdStr + "...");
+      JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
+      ApplicationId appId = jobId.getAppId();
+
+      // create the app level timeline collector and start it
+      AppLevelTimelineCollector collector =
+          new AppLevelTimelineCollector(appId);
+      manager.putIfAbsent(appId, collector);
+      try {
+        // parse the job info and configuration
+        JobInfo jobInfo =
+            parser.parseHistoryFile(job.getJobHistoryFilePath());
+        Configuration jobConf =
+            parser.parseConfiguration(job.getJobConfFilePath());
+        LOG.info("parsed the job history file and the configuration file " +
+            "for job " + jobIdStr);
+
+        // set the context
+        // flow id: job name, flow run id: timestamp, user id
+        TimelineCollectorContext tlContext =
+            collector.getTimelineEntityContext();
+        tlContext.setFlowName(jobInfo.getJobname());
+        tlContext.setFlowRunId(jobInfo.getSubmitTime());
+        tlContext.setUserId(jobInfo.getUsername());
+
+        // create entities from job history and write them
+        long totalTime = 0;
+        List<TimelineEntity> entitySet =
+            converter.createTimelineEntities(jobInfo, jobConf);
+        LOG.info("converted them into timeline entities for job " + jobIdStr);
+        // use the current user for this purpose
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        long startWrite = System.nanoTime();
+        try {
+          switch (replayMode) {
+          case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
+            writeAllEntities(collector, entitySet, ugi);
+            break;
+          case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
+            writePerEntity(collector, entitySet, ugi);
+            break;
+          default:
+            break;
+          }
+        } catch (Exception e) {
+          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+              increment(1);
+          LOG.error("writing to the timeline service failed", e);
+        }
+        long endWrite = System.nanoTime();
+        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+        int numEntities = entitySet.size();
+        LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
+
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+            increment(totalTime);
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+            increment(numEntities);
+      } finally {
+        manager.remove(appId);
+        context.progress(); // move it along
+      }
+    }
+  }
+
+  private void writeAllEntities(AppLevelTimelineCollector collector,
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException {
+    TimelineEntities entities = new TimelineEntities();
+    entities.setEntities(entitySet);
+    collector.putEntities(entities, ugi);
+  }
+
+  private void writePerEntity(AppLevelTimelineCollector collector,
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException {
+    for (TimelineEntity entity : entitySet) {
+      TimelineEntities entities = new TimelineEntities();
+      entities.addEntity(entity);
+      collector.putEntities(entities, ugi);
+      LOG.info("wrote entity " + entity.getId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
new file mode 100644
index 0000000..d96ad76
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Constants for simple entity writers.
+ */
+interface SimpleEntityWriterConstants {
+  // constants for mtype = 1
+  String KBS_SENT = "kbs sent";
+  int KBS_SENT_DEFAULT = 1;
+  String TEST_TIMES = "testtimes";
+  int TEST_TIMES_DEFAULT = 100;
+  String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
+      "timeline.server.performance.run.id";
+
+  /**
+   *  To ensure that the compression really gets exercised, generate a
+   *  random alphanumeric fixed length payload.
+   */
+  char[] ALPHA_NUMS = new char[] {'a', 'b', 'c', 'd', 'e', 'f',
+      'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
+      's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
+      'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+      'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
+      '3', '4', '5', '6', '7', '8', '9', '0', ' '};
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
index 2c851e9..16d14a1 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
@@ -27,45 +27,25 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
    * Adds simple entities with random string payload, events, metrics, and
    * configuration.
    */
-class SimpleEntityWriterV1 extends
-    
org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+class SimpleEntityWriterV1
+    extends org.apache.hadoop.mapreduce.Mapper
+        <IntWritable, IntWritable, Writable, Writable>
+    implements SimpleEntityWriterConstants {
   private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
 
-  // constants for mtype = 1
-  static final String KBS_SENT = "kbs sent";
-  static final int KBS_SENT_DEFAULT = 1;
-  static final String TEST_TIMES = "testtimes";
-  static final int TEST_TIMES_DEFAULT = 100;
-  static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
-      "timeline.server.performance.run.id";
-  /**
-   *  To ensure that the compression really gets exercised, generate a
-   *  random alphanumeric fixed length payload
-   */
-  private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
-    'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
-    's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
-    'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
-    'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
-    '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
-
-  public void map(IntWritable key, IntWritable val, Context context) throws 
IOException {
+  public void map(IntWritable key, IntWritable val, Context context)
+      throws IOException {
     TimelineClient tlc = new TimelineClientImpl();
     Configuration conf = context.getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
new file mode 100644
index 0000000..d66deb0
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Adds simple entities with random string payload, events, metrics, and
+ * configuration.
+ */
+class SimpleEntityWriterV2 extends EntityWriterV2
+    implements SimpleEntityWriterConstants {
+  private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class);
+
+  protected void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    // simulate the app id with the task id
+    int taskId = context.getTaskAttemptID().getTaskID().getId();
+    long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
+    ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
+
+    // create the app level timeline collector
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(appId);
+    manager.putIfAbsent(appId, collector);
+
+    try {
+      // set the context
+      // flow id: job name, flow run id: timestamp, user id
+      TimelineCollectorContext tlContext =
+          collector.getTimelineEntityContext();
+      tlContext.setFlowName(context.getJobName());
+      tlContext.setFlowRunId(timestamp);
+      tlContext.setUserId(context.getUser());
+
+      final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
+
+      long totalTime = 0;
+      final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
+      final Random rand = new Random();
+      final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
+      final char[] payLoad = new char[kbs * 1024];
+
+      for (int i = 0; i < testtimes; i++) {
+        // Generate a fixed length random payload
+        for (int xx = 0; xx < kbs * 1024; xx++) {
+          int alphaNumIdx =
+              rand.nextInt(ALPHA_NUMS.length);
+          payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
+        }
+        String entId = taskAttemptId + "_" + Integer.toString(i);
+        final TimelineEntity entity = new TimelineEntity();
+        entity.setId(entId);
+        entity.setType("FOO_ATTEMPT");
+        entity.addInfo("PERF_TEST", payLoad);
+        // add an event
+        TimelineEvent event = new TimelineEvent();
+        event.setId("foo_event_id");
+        event.setTimestamp(System.currentTimeMillis());
+        event.addInfo("foo_event", "test");
+        entity.addEvent(event);
+        // add a metric
+        TimelineMetric metric = new TimelineMetric();
+        metric.setId("foo_metric");
+        metric.addValue(System.currentTimeMillis(), 123456789L);
+        entity.addMetric(metric);
+        // add a config
+        entity.addConfig("foo", "bar");
+
+        TimelineEntities entities = new TimelineEntities();
+        entities.addEntity(entity);
+        // use the current user for this purpose
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        long startWrite = System.nanoTime();
+        try {
+          collector.putEntities(entities, ugi);
+        } catch (Exception e) {
+          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+              increment(1);
+          LOG.error("writing to the timeline service failed", e);
+        }
+        long endWrite = System.nanoTime();
+        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+      }
+      LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
+          " kB) in " + totalTime + " ms");
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+          increment(totalTime);
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+          increment(testtimes);
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
+          increment(kbs*testtimes);
+    } finally {
+      // clean up
+      manager.remove(appId);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
index 79d123e..dcc3ce0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
@@ -25,11 +25,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -95,9 +90,10 @@ class TimelineEntityConverterV1 {
     return job;
   }
 
-  private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo 
jobInfo) {
+  private Set<TimelineEntity>
+      createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
     Set<TimelineEntity> entities = new HashSet<>();
-    Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
+    Map<TaskID, TaskInfo> taskInfoMap = jobInfo.getAllTasks();
     LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
         " tasks");
     for (TaskInfo taskInfo: taskInfoMap.values()) {
@@ -129,7 +125,7 @@ class TimelineEntityConverterV1 {
 
   private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
     Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
-    Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
+    Map<TaskAttemptID, TaskAttemptInfo> taskAttemptInfoMap =
         taskInfo.getAllTaskAttempts();
     LOG.info("task " + taskInfo.getTaskId() + " has " +
         taskAttemptInfoMap.size() + " task attempts");
@@ -140,7 +136,8 @@ class TimelineEntityConverterV1 {
     return taskAttempts;
   }
 
-  private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo 
taskAttemptInfo) {
+  private TimelineEntity
+      createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
     TimelineEntity taskAttempt = new TimelineEntity();
     taskAttempt.setEntityType(TASK_ATTEMPT);
     taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to