Author: omalley
Date: Fri Mar 4 03:26:16 2011
New Revision: 1076962
URL: http://svn.apache.org/viewvc?rev=1076962&view=rev
Log:
commit 2a423584ebf75afbb08b49c9f8267be6973a9e26
Author: Lee Tucker <[email protected]>
Date: Thu Jul 30 17:40:48 2009 -0700
Applying patch 2899836.mr740.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties?rev=1076962&r1=1076961&r2=1076962&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
Fri Mar 4 03:26:16 2011
@@ -3,6 +3,16 @@ hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log
+#
+# Job Summary Appender
+#
+# Use following logger to send summary to separate file defined by
+# hadoop.mapreduce.jobsummary.log.file rolled daily:
+# hadoop.mapreduce.jobsummary.logger=INFO,JSA
+#
+hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
+hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
+
# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter
@@ -92,3 +102,14 @@ log4j.logger.org.jets3t.service.impl.res
# Sends counts of logging messages at different severity levels to Hadoop
Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+#
+# Job Summary Appender
+#
+log4j.appender.JSA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
+log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
+log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}:
%m%n
+log4j.appender.JSA.DatePattern=.yyyy-MM-dd
+log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
+log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076962&r1=1076961&r2=1076962&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:26:16 2011
@@ -203,6 +203,8 @@ class JobInProgress {
OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
RACK_LOCAL_MAPS,
+ SLOTS_MILLIS_MAPS,
+ SLOTS_MILLIS_REDUCES,
FALLOW_SLOTS_MILLIS_MAPS,
FALLOW_SLOTS_MILLIS_REDUCES
}
@@ -2129,8 +2131,25 @@ class JobInProgress {
}
return true;
}
+
/**
+ * Metering: Occupied Slots * (Finish - Start)
+ * @param tip {@link TaskInProgress} to be metered which just completed,
+ * cannot be <code>null</code>
+ * @param status {@link TaskStatus} of the completed task, cannot be
+ * <code>null</code>
+ */
+ private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
+ Counter slotCounter =
+ (tip.isMapTask()) ? Counter.SLOTS_MILLIS_MAPS :
+ Counter.SLOTS_MILLIS_REDUCES;
+ jobCounters.incrCounter(slotCounter,
+ tip.getNumSlotsRequired() *
+ (status.getFinishTime() - status.getStartTime()));
+ }
+
+ /**
* A taskid assigned to this JobInProgress has reported in successfully.
*/
public synchronized boolean completedTask(TaskInProgress tip,
@@ -2140,6 +2159,9 @@ class JobInProgress {
int oldNumAttempts = tip.getActiveTasks().size();
final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
+ // Metering
+ meterTaskAttempt(tip, status);
+
// Sanity check: is the TIP already complete?
// It _is_ safe to not decrement running{Map|Reduce}Tasks and
// finished{Map|Reduce}Tasks variables here because one and only
@@ -2281,6 +2303,12 @@ class JobInProgress {
this.finishTime = System.currentTimeMillis();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
+
+ // Log the job summary (this should be done prior to logging to
+ // job-history to ensure job-counters are in-sync
+ JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+
+ // Log job-history
JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime,
this.finishedMapTasks,
this.finishedReduceTasks, failedMapTasks,
@@ -2296,6 +2324,9 @@ class JobInProgress {
private synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
+ // Log the job summary
+ JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+
if (jobTerminationState == JobStatus.FAILED) {
this.status = new JobStatus(status.getJobID(),
1.0f, 1.0f, 1.0f, JobStatus.FAILED,
@@ -2314,6 +2345,7 @@ class JobInProgress {
this.finishedReduceTasks);
}
garbageCollect();
+
jobtracker.getInstrumentation().terminateJob(
this.conf, this.status.getJobID());
}
@@ -2474,9 +2506,13 @@ class JobInProgress {
failReduce(tip);
}
}
+
+ // Metering
+ meterTaskAttempt(tip, status);
}
- // the case when the map was complete but the task tracker went down.
+ // The case when the map was complete but the task tracker went down.
+ // However, we don't need to do any metering here...
if (wasComplete && !isComplete) {
if (tip.isMapTask()) {
// Put the task back in the cache. This will help locality for cases
@@ -2667,8 +2703,9 @@ class JobInProgress {
* from the various tables.
*/
synchronized void garbageCollect() {
- //Cancel task tracker reservation
+ // Cancel task tracker reservation
cancelReservedSlots();
+
// Let the JobTracker know that a job is complete
jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
jobtracker.getInstrumentation().decWaitingReduces(getJobID(),
pendingReduces());
@@ -2853,4 +2890,64 @@ class JobInProgress {
void setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
}
+
+ static class JobSummary {
+ static final Log LOG = LogFactory.getLog(JobSummary.class);
+
+ // Escape sequences
+ static final char EQUALS = '=';
+ static final char[] charsToEscape =
+ {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
+
+ /**
+ * Log a summary of the job's runtime.
+ *
+ * @param job {@link JobInProgress} whose summary is to be logged, cannot
+ * be <code>null</code>.
+ * @param cluster {@link ClusterStatus} of the cluster on which the job was
+ * run, cannot be <code>null</code>
+ */
+ public static void logJobSummary(JobInProgress job, ClusterStatus cluster)
{
+ JobStatus status = job.getStatus();
+ JobProfile profile = job.getProfile();
+ String user = StringUtils.escapeString(profile.getUser(),
+ StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ String queue = StringUtils.escapeString(profile.getQueueName(),
+ StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ Counters jobCounters = job.getJobCounters();
+ long mapSlotSeconds =
+ (jobCounters.getCounter(Counter.SLOTS_MILLIS_MAPS) +
+ jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
+ long reduceSlotSeconds =
+ (jobCounters.getCounter(Counter.SLOTS_MILLIS_REDUCES) +
+ jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
+
+ LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
+ "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
+ "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA
+
+ "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA
+
+ "numMaps" + EQUALS + job.getMapTasks().length +
+ StringUtils.COMMA +
+ "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() +
+ StringUtils.COMMA +
+ "numReduces" + EQUALS + job.getReduceTasks().length +
+ StringUtils.COMMA +
+ "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() +
+ StringUtils.COMMA +
+ "user" + EQUALS + user + StringUtils.COMMA +
+ "queue" + EQUALS + queue + StringUtils.COMMA +
+ "status" + EQUALS +
+ JobStatus.getJobRunState(status.getRunState()) +
+ StringUtils.COMMA +
+ "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
+ "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds +
+ StringUtils.COMMA +
+ "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() +
+ StringUtils.COMMA +
+ "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
+ );
+ }
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1076962&r1=1076961&r2=1076962&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
Fri Mar 4 03:26:16 2011
@@ -48,6 +48,22 @@ public class JobStatus implements Writab
public static final int PREP = 4;
public static final int KILLED = 5;
+ private static final String UNKNOWN = "UNKNOWN";
+ private static final String[] runStates =
+ {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"};
+
+ /**
+ * Helper method to get human-readable state of the job.
+ * @param state job state
+ * @return human-readable state of the job
+ */
+ public static String getJobRunState(int state) {
+ if (state < 1 || state >= runStates.length) {
+ return UNKNOWN;
+ }
+ return runStates[state];
+ }
+
private JobID jobid;
private float mapProgress;
private float reduceProgress;
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076962&r1=1076961&r2=1076962&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Fri Mar 4 03:26:16 2011
@@ -1186,4 +1186,8 @@ class TaskInProgress {
TreeMap<TaskAttemptID, String> getActiveTasks() {
return activeTasks;
}
+
+ int getNumSlotsRequired() {
+ return numSlotsRequired;
+ }
}