http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index bc046c7..3121c4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -26,6 +28,10 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record successful completion of a map attempt @@ -33,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class MapAttemptFinishedEvent implements HistoryEvent { +public class MapAttemptFinishedEvent implements HistoryEvent { private MapAttemptFinished datum = null; @@ -218,4 +224,28 @@ public class MapAttemptFinishedEvent implements HistoryEvent { return physMemKbytes; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime()); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> metrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return metrics; + } + }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java index eead9cf..7adae23 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the normalized map/reduce requirements. @@ -71,4 +76,18 @@ public class NormalizedResourceEvent implements HistoryEvent { public void setDatum(Object datum) { throw new UnsupportedOperationException("Not a seriable object"); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("MEMORY", "" + getMemory()); + tEvent.addInfo("TASK_TYPE", getTaskType()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 6644a48..9c0f09b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -26,6 +28,10 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record successful completion of a reduce attempt @@ -33,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class ReduceAttemptFinishedEvent implements HistoryEvent { +public class ReduceAttemptFinishedEvent implements HistoryEvent { private ReduceAttemptFinished datum = null; @@ -223,4 +229,29 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { return physMemKbytes; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime()); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> metrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return metrics; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index bb7dbe0..a931ca2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -25,6 +27,10 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record successful task completion @@ -136,4 +142,24 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { : EventType.REDUCE_ATTEMPT_FINISHED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("HOSTNAME", getHostname()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> metrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return metrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index 3073d5b..d09d5ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -18,16 +18,20 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.avro.util.Utf8; - /** * Event to record start of a task attempt * @@ -133,4 +137,25 @@ public class TaskAttemptStartedEvent implements HistoryEvent { return null; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", + getTaskAttemptId().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("HTTP_PORT", getHttpPort()); + tEvent.addInfo("TRACKER_NAME", getTrackerName()); + tEvent.addInfo("SHUFFLE_PORT", getShufflePort()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 77ee2a0..1732d91 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -18,17 +18,21 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.ProgressSplitsBlock; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; - -import org.apache.hadoop.mapred.ProgressSplitsBlock; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record unsuccessful (Killed/Failed) completion of task attempts @@ -248,4 +252,29 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { return physMemKbytes; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ? + "" : getTaskAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getFinishTime()); + tEvent.addInfo("MAP_FINISH_TIME", getFinishTime()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> metrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return metrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index 2838f08..d14350d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -18,14 +18,20 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the failure of a task @@ -137,4 +143,23 @@ public class TaskFailedEvent implements HistoryEvent { return EventType.TASK_FAILED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("FAILED_ATTEMPT_ID", + getFailedAttemptID() == null ? "" : getFailedAttemptID().toString()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> metrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return metrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index d4ec74d..0bc4383 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -18,13 +18,20 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the successful completion of a task @@ -115,5 +122,23 @@ public class TaskFinishedEvent implements HistoryEvent { return EventType.TASK_FINISHED; } - + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); + tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID", + getSuccessfulTaskAttemptId() == null ? "" : + getSuccessfulTaskAttemptId().toString()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> jobMetrics = JobHistoryEventUtils + .countersToTimelineMetric(getCounters(), finishTime); + return jobMetrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java index ed53b03..9d2fc0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java @@ -18,11 +18,16 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the start of a task @@ -72,4 +77,19 @@ public class TaskStartedEvent implements HistoryEvent { return EventType.TASK_STARTED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java index 58f4143..010129d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java @@ -18,13 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; +import java.util.Set; +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record updates to a task @@ -61,4 +63,17 @@ public class TaskUpdatedEvent implements HistoryEvent { return EventType.TASK_UPDATED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java new file mode 100644 index 0000000..89abf0e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java @@ -0,0 +1,83 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.util; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; + +/** + * Class containing utility methods to be used by JobHistoryEventHandler. + */ +public final class JobHistoryEventUtils { + private JobHistoryEventUtils() { + } + + // Number of bytes of config which can be published in one shot to ATSv2. + public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024; + + public static JsonNode countersToJSON(Counters counters) { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode nodes = mapper.createArrayNode(); + if (counters != null) { + for (CounterGroup counterGroup : counters) { + ObjectNode groupNode = nodes.addObject(); + groupNode.put("NAME", counterGroup.getName()); + groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); + ArrayNode countersNode = groupNode.putArray("COUNTERS"); + for (Counter counter : counterGroup) { + ObjectNode counterNode = countersNode.addObject(); + counterNode.put("NAME", counter.getName()); + counterNode.put("DISPLAY_NAME", counter.getDisplayName()); + counterNode.put("VALUE", counter.getValue()); + } + } + } + return nodes; + } + + public static Set<TimelineMetric> countersToTimelineMetric(Counters counters, + long timestamp) { + return countersToTimelineMetric(counters, timestamp, ""); + } + + public static Set<TimelineMetric> countersToTimelineMetric(Counters counters, + long timestamp, String groupNamePrefix) { + Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>(); + for (CounterGroup g : counters) { + String groupName = g.getName(); + for (Counter c : g) { + String name = groupNamePrefix + groupName + ":" + c.getName(); + TimelineMetric metric = new TimelineMetric(); + metric.setId(name); + metric.addValue(timestamp, c.getValue()); + entityMetrics.add(metric); + } + } + return entityMetrics; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index 263a3e7..0e87d3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -100,6 +100,12 @@ <type>test-jar</type> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index 40ed9ad..90748a9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -18,23 +18,54 @@ package org.apache.hadoop.mapred; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.timeline.TimelineStore; - +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestMRTimelineEventHandling { + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; + private static final Log LOG = + LogFactory.getLog(TestMRTimelineEventHandling.class); + @Test public void testTimelineServiceStartInMiniCluster() throws Exception { Configuration conf = new YarnConfiguration(); @@ -48,7 +79,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); @@ -89,7 +120,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, @@ -138,6 +169,280 @@ public class TestMRTimelineEventHandling { } } + @SuppressWarnings("deprecation") + @Test + public void testMRNewTimelineServiceEventHandling() throws Exception { + LOG.info("testMRNewTimelineServiceEventHandling start."); + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // enable new timeline service + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + + MiniMRYarnCluster cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestMRTimelineEventHandling.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + LOG.info("A MiniMRYarnCluster get start."); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + LOG.info("Run 1st job which should be successful."); + JobConf successConf = new JobConf(conf); + successConf.set("dummy_conf1", + UtilsForTests.createConfigValue(51 * 1024)); + successConf.set("dummy_conf2", + UtilsForTests.createConfigValue(51 * 1024)); + successConf.set("huge_dummy_conf1", + UtilsForTests.createConfigValue(101 * 1024)); + successConf.set("huge_dummy_conf2", + UtilsForTests.createConfigValue(101 * 1024)); + RunningJob job = + UtilsForTests.runJobSucceed(successConf, inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(cluster.getConfig())); + yarnClient.start(); + EnumSet<YarnApplicationState> appStates = + EnumSet.allOf(YarnApplicationState.class); + + ApplicationId firstAppId = null; + List<ApplicationReport> apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 1); + ApplicationReport appReport = apps.get(0); + firstAppId = appReport.getApplicationId(); + UtilsForTests.waitForAppFinished(job, cluster); + checkNewTimelineEvent(firstAppId, appReport); + + LOG.info("Run 2nd job which should be failed."); + job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.FAILED, + job.getJobStatus().getState().getValue()); + + apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 2); + + appReport = apps.get(0).getApplicationId().equals(firstAppId) ? + apps.get(0) : apps.get(1); + + checkNewTimelineEvent(firstAppId, appReport); + + } finally { + if (cluster != null) { + cluster.stop(); + } + // Cleanup test file + String testRoot = + FileSystemTimelineWriterImpl. + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + File testRootFolder = new File(testRoot); + if(testRootFolder.isDirectory()) { + FileUtils.deleteDirectory(testRootFolder); + } + + } + } + + private void checkNewTimelineEvent(ApplicationId appId, + ApplicationReport appReport) throws IOException { + String tmpRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "/entities/"; + + File tmpRootFolder = new File(tmpRoot); + + Assert.assertTrue(tmpRootFolder.isDirectory()); + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + + "/" + appReport.getName() + + "/" + TimelineUtils.DEFAULT_FLOW_VERSION + + "/" + appReport.getStartTime() + + "/" + appId.toString(); + // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs + String outputDirJob = basePath + "/MAPREDUCE_JOB/"; + + File entityFolder = new File(outputDirJob); + Assert.assertTrue("Job output directory: " + outputDirJob + + " does not exist.", + entityFolder.isDirectory()); + + // check for job event file + String jobEventFileName = appId.toString().replaceAll("application", "job") + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String jobEventFilePath = outputDirJob + jobEventFileName; + File jobEventFile = new File(jobEventFilePath); + Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + + " does not exist.", + jobEventFile.exists()); + verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(), + true, false, null); + Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2", + "huge_dummy_conf1", "huge_dummy_conf2"); + verifyEntity(jobEventFile, null, false, true, cfgsToCheck); + + // for this test, we expect MR job metrics are published in YARN_APPLICATION + String outputAppDir = basePath + "/YARN_APPLICATION/"; + entityFolder = new File(outputAppDir); + Assert.assertTrue( + "Job output directory: " + outputAppDir + + " does not exist.", + entityFolder.isDirectory()); + + // check for job event file + String appEventFileName = appId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String appEventFilePath = outputAppDir + appEventFileName; + File appEventFile = new File(appEventFilePath); + Assert.assertTrue( + "appEventFilePath: " + appEventFilePath + + " does not exist.", + appEventFile.exists()); + verifyEntity(appEventFile, null, true, false, null); + verifyEntity(appEventFile, null, false, true, cfgsToCheck); + + // check for task event file + String outputDirTask = basePath + "/MAPREDUCE_TASK/"; + File taskFolder = new File(outputDirTask); + Assert.assertTrue("Task output directory: " + outputDirTask + + " does not exist.", + taskFolder.isDirectory()); + + String taskEventFileName = + appId.toString().replaceAll("application", "task") + + "_m_000000" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskEventFilePath = outputDirTask + taskEventFileName; + File taskEventFile = new File(taskEventFilePath); + Assert.assertTrue("taskEventFileName: " + taskEventFilePath + + " does not exist.", + taskEventFile.exists()); + verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(), + true, false, null); + + // check for task attempt event file + String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; + File taskAttemptFolder = new File(outputDirTaskAttempt); + Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + + " does not exist.", taskAttemptFolder.isDirectory()); + + String taskAttemptEventFileName = appId.toString().replaceAll( + "application", "attempt") + "_m_000000_0" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskAttemptEventFilePath = outputDirTaskAttempt + + taskAttemptEventFileName; + File taskAttemptEventFile = new File(taskAttemptEventFilePath); + Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + + " does not exist.", taskAttemptEventFile.exists()); + verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(), + true, false, null); + } + + /** + * Verifies entity by reading the entity file written via FS impl. + * @param entityFile File to be read. + * @param eventId Event to be checked. + * @param chkMetrics If event is not null, this flag determines if metrics + * exist when the event is encountered. If event is null, we merely check + * if metrics exist in the entity file. + * @param chkCfg If event is not null, this flag determines if configs + * exist when the event is encountered. If event is null, we merely check + * if configs exist in the entity file. + * @param cfgsToVerify a set of configs which should exist in the entity file. + * @throws IOException + */ + private void verifyEntity(File entityFile, String eventId, + boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify) + throws IOException { + BufferedReader reader = null; + String strLine; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().length() > 0) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = + FileSystemTimelineReaderImpl.getTimelineRecordFromJSON( + strLine.trim(), + org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity.class); + if (eventId == null) { + // Job metrics are published without any events for + // ApplicationEntity. There is also possibility that some other + // ApplicationEntity is published without events, hence loop till + // its found. Same applies to configs. + if (chkMetrics && entity.getMetrics().size() > 0) { + return; + } + if (chkCfg && entity.getConfigs().size() > 0) { + if (cfgsToVerify == null) { + return; + } else { + // Have configs to verify. Keep on removing configs from the set + // of configs to verify as they are found. When the all the + // entities have been looped through, we will check if the set + // is empty or not(indicating if all configs have been found or + // not). + for (Iterator<String> itr = + cfgsToVerify.iterator(); itr.hasNext();) { + String config = itr.next(); + if (entity.getConfigs().containsKey(config)) { + itr.remove(); + } + } + // All the required configs have been verified, so return. + if (cfgsToVerify.isEmpty()) { + return; + } + } + } + } else { + for (TimelineEvent event : entity.getEvents()) { + if (event.getId().equals(eventId)) { + if (chkMetrics) { + assertTrue(entity.getMetrics().size() > 0); + } + if (chkCfg) { + assertTrue(entity.getConfigs().size() > 0); + if (cfgsToVerify != null) { + for (String cfg : cfgsToVerify) { + assertTrue(entity.getConfigs().containsKey(cfg)); + } + } + } + return; + } + } + } + } + } + if (cfgsToVerify != null) { + assertTrue(cfgsToVerify.isEmpty()); + return; + } + fail("Expected event : " + eventId + " not found in the file " + + entityFile); + } finally { + reader.close(); + } + } + @Test public void testMapreduceJobTimelineServiceEnabled() throws Exception { @@ -147,7 +452,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java index 972391c..ec6ad38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java @@ -31,8 +31,10 @@ import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -52,10 +54,13 @@ import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSeq import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; - -import org.apache.commons.logging.Log; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import com.google.common.base.Supplier; /** * Utilities used in unit test. @@ -150,6 +155,14 @@ public class UtilsForTests { return buf.toString(); } + public static String createConfigValue(int msgSize) { + StringBuilder sb = new StringBuilder(msgSize); + for (int i = 0; i < msgSize; i++) { + sb.append('a'); + } + return sb.toString(); + } + public static String safeGetCanonicalPath(File f) { try { String s = f.getCanonicalPath(); @@ -607,6 +620,29 @@ public class UtilsForTests { return job; } + public static void waitForAppFinished(RunningJob job, + MiniMRYarnCluster cluster) throws IOException { + ApplicationId appId = ApplicationId.newInstance( + Long.parseLong(job.getID().getJtIdentifier()), job.getID().getId()); + ConcurrentMap<ApplicationId, RMApp> rmApps = cluster.getResourceManager() + .getRMContext().getRMApps(); + if (!rmApps.containsKey(appId)) { + throw new IOException("Job not found"); + } + final RMApp rmApp = rmApps.get(appId); + try { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return RMAppImpl.isAppInFinalState(rmApp); + } + }, 1000, 1000 * 180); + } catch (TimeoutException | InterruptedException e1) { + throw new IOException("Yarn application with " + appId + " didn't finish " + + "did not reach finale State", e1); + } + } + // Run a job that will be succeeded and wait until it completes public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java new file mode 100644 index 0000000..74d7b94 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Base mapper for writing entities to the timeline service. Subclasses + * override {@link #writeEntities(Configuration, TimelineCollectorManager, + * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities + * to the timeline service. + */ +abstract class EntityWriterV2 + extends org.apache.hadoop.mapreduce.Mapper + <IntWritable, IntWritable, Writable, Writable> { + @Override + public void map(IntWritable key, IntWritable val, Context context) + throws IOException { + + // create the timeline collector manager wired with the writer + Configuration tlConf = new YarnConfiguration(); + TimelineCollectorManager manager = new TimelineCollectorManager("test"); + manager.init(tlConf); + manager.start(); + try { + // invoke the method to have the subclass write entities + writeEntities(tlConf, manager, context); + } finally { + manager.close(); + } + } + + protected abstract void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java index c290cd6..5d9dc0b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java @@ -28,6 +28,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +/** + * Used to parse job history and configuration files. + */ class JobHistoryFileParser { private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java index 5e10662..447ea4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java @@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; -import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper; import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java new file mode 100644 index 0000000..2ec4833 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Mapper for TimelineServicePerformance that replays job history files to the + * timeline service v.2. + * + */ +class JobHistoryFileReplayMapperV2 extends EntityWriterV2 { + private static final Log LOG = + LogFactory.getLog(JobHistoryFileReplayMapperV2.class); + + @Override + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context); + int replayMode = helper.getReplayMode(); + JobHistoryFileParser parser = helper.getParser(); + TimelineEntityConverterV2 converter = new TimelineEntityConverterV2(); + + // collect the apps it needs to process + Collection<JobFiles> jobs = helper.getJobFiles(); + if (jobs.isEmpty()) { + LOG.info(context.getTaskAttemptID().getTaskID() + + " will process no jobs"); + } else { + LOG.info(context.getTaskAttemptID().getTaskID() + " will process " + + jobs.size() + " jobs"); + } + for (JobFiles job: jobs) { + // process each job + String jobIdStr = job.getJobId(); + // skip if either of the file is missing + if (job.getJobConfFilePath() == null || + job.getJobHistoryFilePath() == null) { + LOG.info(jobIdStr + " missing either the job history file or the " + + "configuration file. Skipping."); + continue; + } + LOG.info("processing " + jobIdStr + "..."); + JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr)); + ApplicationId appId = jobId.getAppId(); + + // create the app level timeline collector and start it + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + try { + // parse the job info and configuration + JobInfo jobInfo = + parser.parseHistoryFile(job.getJobHistoryFilePath()); + Configuration jobConf = + parser.parseConfiguration(job.getJobConfFilePath()); + LOG.info("parsed the job history file and the configuration file " + + "for job " + jobIdStr); + + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(jobInfo.getJobname()); + tlContext.setFlowRunId(jobInfo.getSubmitTime()); + tlContext.setUserId(jobInfo.getUsername()); + + // create entities from job history and write them + long totalTime = 0; + List<TimelineEntity> entitySet = + converter.createTimelineEntities(jobInfo, jobConf); + LOG.info("converted them into timeline entities for job " + jobIdStr); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + switch (replayMode) { + case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE: + writeAllEntities(collector, entitySet, ugi); + break; + case JobHistoryFileReplayHelper.WRITE_PER_ENTITY: + writePerEntity(collector, entitySet, ugi); + break; + default: + break; + } + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + int numEntities = entitySet.size(); + LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms"); + + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(numEntities); + } finally { + manager.remove(appId); + context.progress(); // move it along + } + } + } + + private void writeAllEntities(AppLevelTimelineCollector collector, + List<TimelineEntity> entitySet, UserGroupInformation ugi) + throws IOException { + TimelineEntities entities = new TimelineEntities(); + entities.setEntities(entitySet); + collector.putEntities(entities, ugi); + } + + private void writePerEntity(AppLevelTimelineCollector collector, + List<TimelineEntity> entitySet, UserGroupInformation ugi) + throws IOException { + for (TimelineEntity entity : entitySet) { + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + collector.putEntities(entities, ugi); + LOG.info("wrote entity " + entity.getId()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java new file mode 100644 index 0000000..d96ad76 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +/** + * Constants for simple entity writers. + */ +interface SimpleEntityWriterConstants { + // constants for mtype = 1 + String KBS_SENT = "kbs sent"; + int KBS_SENT_DEFAULT = 1; + String TEST_TIMES = "testtimes"; + int TEST_TIMES_DEFAULT = 100; + String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = + "timeline.server.performance.run.id"; + + /** + * To ensure that the compression really gets exercised, generate a + * random alphanumeric fixed length payload. + */ + char[] ALPHA_NUMS = new char[] {'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', + 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', ' '}; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java index 2c851e9..16d14a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java @@ -27,45 +27,25 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; -import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Adds simple entities with random string payload, events, metrics, and * configuration. */ -class SimpleEntityWriterV1 extends - org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> { +class SimpleEntityWriterV1 + extends org.apache.hadoop.mapreduce.Mapper + <IntWritable, IntWritable, Writable, Writable> + implements SimpleEntityWriterConstants { private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class); - // constants for mtype = 1 - static final String KBS_SENT = "kbs sent"; - static final int KBS_SENT_DEFAULT = 1; - static final String TEST_TIMES = "testtimes"; - static final int TEST_TIMES_DEFAULT = 100; - static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = - "timeline.server.performance.run.id"; - /** - * To ensure that the compression really gets exercised, generate a - * random alphanumeric fixed length payload - */ - private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', - 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', - 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', - 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', - 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', - '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; - - public void map(IntWritable key, IntWritable val, Context context) throws IOException { + public void map(IntWritable key, IntWritable val, Context context) + throws IOException { TimelineClient tlc = new TimelineClientImpl(); Configuration conf = context.getConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java new file mode 100644 index 0000000..d66deb0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Adds simple entities with random string payload, events, metrics, and + * configuration. + */ +class SimpleEntityWriterV2 extends EntityWriterV2 + implements SimpleEntityWriterConstants { + private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class); + + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + Configuration conf = context.getConfiguration(); + // simulate the app id with the task id + int taskId = context.getTaskAttemptID().getTaskID().getId(); + long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); + ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); + + // create the app level timeline collector + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + + try { + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(context.getJobName()); + tlContext.setFlowRunId(timestamp); + tlContext.setUserId(context.getUser()); + + final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT); + + long totalTime = 0; + final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = + rand.nextInt(ALPHA_NUMS.length); + payLoad[xx] = ALPHA_NUMS[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + final TimelineEntity entity = new TimelineEntity(); + entity.setId(entId); + entity.setType("FOO_ATTEMPT"); + entity.addInfo("PERF_TEST", payLoad); + // add an event + TimelineEvent event = new TimelineEvent(); + event.setId("foo_event_id"); + event.setTimestamp(System.currentTimeMillis()); + event.addInfo("foo_event", "test"); + entity.addEvent(event); + // add a metric + TimelineMetric metric = new TimelineMetric(); + metric.setId("foo_metric"); + metric.addValue(System.currentTimeMillis(), 123456789L); + entity.addMetric(metric); + // add a config + entity.addConfig("foo", "bar"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + collector.putEntities(entities, ugi); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + } + LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + + " kB) in " + totalTime + " ms"); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(testtimes); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). + increment(kbs*testtimes); + } finally { + // clean up + manager.remove(appId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java index 79d123e..dcc3ce0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java @@ -25,11 +25,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; @@ -95,9 +90,10 @@ class TimelineEntityConverterV1 { return job; } - private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) { + private Set<TimelineEntity> + createTaskAndTaskAttemptEntities(JobInfo jobInfo) { Set<TimelineEntity> entities = new HashSet<>(); - Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks(); + Map<TaskID, TaskInfo> taskInfoMap = jobInfo.getAllTasks(); LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + " tasks"); for (TaskInfo taskInfo: taskInfoMap.values()) { @@ -129,7 +125,7 @@ class TimelineEntityConverterV1 { private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) { Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>(); - Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap = + Map<TaskAttemptID, TaskAttemptInfo> taskAttemptInfoMap = taskInfo.getAllTaskAttempts(); LOG.info("task " + taskInfo.getTaskId() + " has " + taskAttemptInfoMap.size() + " task attempts"); @@ -140,7 +136,8 @@ class TimelineEntityConverterV1 { return taskAttempts; } - private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) { + private TimelineEntity + createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) { TimelineEntity taskAttempt = new TimelineEntity(); taskAttempt.setEntityType(TASK_ATTEMPT); taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org