Author: ddas
Date: Thu Dec 11 08:18:18 2008
New Revision: 725729
URL: http://svn.apache.org/viewvc?rev=725729&view=rev
Log:
HADOOP-4737. Adds the KILLED notification when jobs get killed. Contributed by
Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 08:18:18 2008
@@ -218,6 +218,9 @@
HADOOP-4688. Modify the MiniMRDFSSort unit test to spill multiple times,
exercising the map-side merge code. (cdouglas)
+ HADOOP-4737. Adds the KILLED notification when jobs get killed.
+ (Amareshwari Sriramadasu via ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
Thu Dec 11 08:18:18 2008
@@ -105,7 +105,8 @@
}
if (uri.contains("$jobStatus")) {
String statusStr =
- (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" :
"FAILED";
+ (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" :
+ (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
uri = uri.replace("$jobStatus", statusStr);
}
notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Thu Dec 11 08:18:18 2008
@@ -62,6 +62,7 @@
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
+ boolean killed = false;
// Counters summed over all the map/reduce tasks which
// have successfully completed
@@ -100,6 +101,8 @@
@Override
public void run() {
+ JobContext jContext = new JobContext(conf);
+ OutputCommitter outputCommitter = job.getOutputCommitter();
try {
// split input into minimum number of splits
InputSplit[] splits;
@@ -112,33 +115,35 @@
numReduceTasks = 1;
job.setNumReduceTasks(1);
}
- JobContext jContext = new JobContext(conf);
- OutputCommitter outputCommitter = job.getOutputCommitter();
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
DataOutputBuffer buffer = new DataOutputBuffer();
for (int i = 0; i < splits.length; i++) {
- TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true,
i),0);
- mapIds.add(mapId);
- buffer.reset();
- splits[i].write(buffer);
- BytesWritable split = new BytesWritable();
- split.set(buffer.getData(), 0, buffer.getLength());
- MapTask map = new MapTask(file.toString(),
- mapId, i,
- splits[i].getClass().getName(),
- split);
- JobConf localConf = new JobConf(job);
- map.setJobFile(localFile.toString());
- map.localizeConfiguration(localConf);
- map.setConf(localConf);
- map_tasks += 1;
- myMetrics.launchMap(mapId);
- map.run(localConf, this);
- myMetrics.completeMap(mapId);
- map_tasks -= 1;
- updateCounters(map);
+ if (!this.isInterrupted()) {
+ TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true,
i),0);
+ mapIds.add(mapId);
+ buffer.reset();
+ splits[i].write(buffer);
+ BytesWritable split = new BytesWritable();
+ split.set(buffer.getData(), 0, buffer.getLength());
+ MapTask map = new MapTask(file.toString(),
+ mapId, i,
+ splits[i].getClass().getName(),
+ split);
+ JobConf localConf = new JobConf(job);
+ map.setJobFile(localFile.toString());
+ map.localizeConfiguration(localConf);
+ map.setConf(localConf);
+ map_tasks += 1;
+ myMetrics.launchMap(mapId);
+ map.run(localConf, this);
+ myMetrics.completeMap(mapId);
+ map_tasks -= 1;
+ updateCounters(map);
+ } else {
+ throw new InterruptedException();
+ }
}
TaskAttemptID reduceId =
new TaskAttemptID(new TaskID(jobId, false, 0), 0);
@@ -146,19 +151,23 @@
if (numReduceTasks > 0) {
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
- TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn =
this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),reduceId,
- localFs.getLength(mapOut));
- if (!localFs.mkdirs(reduceIn.getParent())) {
- throw new IOException("Mkdirs failed to create "
- + reduceIn.getParent().toString());
+ if (!this.isInterrupted()) {
+ TaskAttemptID mapId = mapIds.get(i);
+ Path mapOut = this.mapoutputFile.getOutputFile(mapId);
+ Path reduceIn = this.mapoutputFile.getInputFileForWrite(
+ mapId.getTaskID(),reduceId,
+ localFs.getLength(mapOut));
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ } else {
+ throw new InterruptedException();
}
- if (!localFs.rename(mapOut, reduceIn))
- throw new IOException("Couldn't rename " + mapOut);
}
-
- {
+ if (!this.isInterrupted()) {
ReduceTask reduce = new ReduceTask(file.toString(),
reduceId, 0, mapIds.size());
JobConf localConf = new JobConf(job);
@@ -171,6 +180,8 @@
myMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
updateCounters(reduce);
+ } else {
+ throw new InterruptedException();
}
}
} finally {
@@ -185,12 +196,26 @@
outputCommitter.cleanupJob(jContext);
status.setCleanupProgress(1.0f);
- this.status.setRunState(JobStatus.SUCCEEDED);
+ if (killed) {
+ this.status.setRunState(JobStatus.KILLED);
+ } else {
+ this.status.setRunState(JobStatus.SUCCEEDED);
+ }
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
- this.status.setRunState(JobStatus.FAILED);
+ try {
+ outputCommitter.cleanupJob(jContext);
+ } catch (IOException ioe) {
+ LOG.info("Error cleaning up job:" + id);
+ }
+ status.setCleanupProgress(1.0f);
+ if (killed) {
+ this.status.setRunState(JobStatus.KILLED);
+ } else {
+ this.status.setRunState(JobStatus.FAILED);
+ }
LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
@@ -307,7 +332,8 @@
}
public void killJob(JobID id) {
- jobs.get(id).stop();
+ jobs.get(id).killed = true;
+ jobs.get(id).interrupt();
}
public void setJobPriority(JobID id, String jp) throws IOException {
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
Thu Dec 11 08:18:18 2008
@@ -100,7 +100,24 @@
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
- if (counter == 0) {
+ switch (counter) {
+ case 0:
+ {
+ assertTrue(req.getQueryString().contains("SUCCEEDED"));
+ }
+ break;
+ case 2:
+ {
+ assertTrue(req.getQueryString().contains("KILLED"));
+ }
+ break;
+ case 4:
+ {
+ assertTrue(req.getQueryString().contains("FAILED"));
+ }
+ break;
+ }
+ if (counter % 2 == 0) {
stdPrintln((new Date()).toString() +
"Receiving First notification for [" + req.getQueryString()
+
"], returning error");
@@ -148,6 +165,22 @@
Thread.currentThread().sleep(2000);
}
assertEquals(2, NotificationServlet.counter);
+
+ // run a job with KILLED status
+ System.out.println(TestJobKillAndFail.runJobKill(this.createJobConf()));
+ synchronized(Thread.currentThread()) {
+ stdPrintln("Sleeping for 2 seconds to give time for retry");
+ Thread.currentThread().sleep(2000);
+ }
+ assertEquals(4, NotificationServlet.counter);
+
+ // run a job with FAILED status
+ System.out.println(TestJobKillAndFail.runJobFail(this.createJobConf()));
+ synchronized(Thread.currentThread()) {
+ stdPrintln("Sleeping for 2 seconds to give time for retry");
+ Thread.currentThread().sleep(2000);
+ }
+ assertEquals(6, NotificationServlet.counter);
}
private String launchWordCount(JobConf conf,