Author: ddas
Date: Thu Feb 14 05:41:01 2008
New Revision: 627741
URL: http://svn.apache.org/viewvc?rev=627741&view=rev
Log:
HADOOP-2391. Cleanup job output directory before declaring a job as SUCCESSFUL.
Contributed by Amareshwari Sri Ramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 14 05:41:01 2008
@@ -76,6 +76,9 @@
the destination after encountering an error. (Tsz Wo (Nicholas), SZE
via cdouglas)
+ HADOOP-2391. Cleanup job output directory before declaring a job as
+ SUCCESSFUL. (Amareshwari Sri Ramadasu via ddas)
+
Release 0.16.0 - 2008-02-07
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu
Feb 14 05:41:01 2008
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobHistory.Values;
@@ -275,6 +276,18 @@
jobtracker, conf, this);
}
+ // create job specific temporary directory in output path
+ Path outputPath = conf.getOutputPath();
+ if (outputPath != null) {
+ Path tmpDir = new Path(outputPath, "_temporary");
+ FileSystem fileSys = tmpDir.getFileSystem(conf);
+ if (!fileSys.mkdirs(tmpDir)) {
+ LOG.error("Mkdirs failed to create " + tmpDir.toString());
+ }
+ } else {
+ LOG.error("Null Output path");
+ }
+
this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f,
JobStatus.RUNNING);
tasksInited = true;
@@ -1129,6 +1142,15 @@
Path tempDir = new Path(conf.getSystemDir(), jobId);
fs.delete(tempDir);
+ // delete the temporary directory in output directory
+ Path outputPath = conf.getOutputPath();
+ if (outputPath != null) {
+ Path tmpDir = new Path(outputPath, "_temporary");
+ FileSystem fileSys = tmpDir.getFileSystem(conf);
+ if (fileSys.exists(tmpDir)) {
+ FileUtil.fullyDelete(fileSys, tmpDir);
+ }
+ }
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
}
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu
Feb 14 05:41:01 2008
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -112,6 +113,13 @@
numReduceTasks = 1;
job.setNumReduceTasks(1);
}
+ // create job specific temp directory in output path
+ Path tmpDir = new Path(job.getOutputPath(), "_temporary");
+ FileSystem fileSys = tmpDir.getFileSystem(job);
+ if (!fileSys.mkdirs(tmpDir)) {
+ LOG.error("Mkdirs failed to create " + tmpDir.toString());
+ }
+
DataOutputBuffer buffer = new DataOutputBuffer();
for (int i = 0; i < splits.length; i++) {
String mapId = jobId + "_map_" + idFormat.format(i);
@@ -125,6 +133,16 @@
splits[i].getClass().getName(),
split);
JobConf localConf = new JobConf(job);
+ if (fileSys.exists(tmpDir)) {
+ Path taskTmpDir = new Path(tmpDir, "_" + mapId);
+ if (!fileSys.mkdirs(taskTmpDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + taskTmpDir.toString());
+ }
+ } else {
+ throw new IOException("The directory " + tmpDir.toString()
+ + " doesnt exist " );
+ }
map.localizeConfiguration(localConf);
map.setConf(localConf);
map_tasks += 1;
@@ -157,6 +175,16 @@
"tip_r_0001",
reduceId, 0, mapIds.size());
JobConf localConf = new JobConf(job);
+ if (fileSys.exists(tmpDir)) {
+ Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
+ if (!fileSys.mkdirs(taskTmpDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + taskTmpDir.toString());
+ }
+ } else {
+ throw new IOException("The directory " + tmpDir.toString()
+ + " doesnt exist ");
+ }
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
@@ -177,6 +205,15 @@
this.mapoutputFile.removeAll(reduceId);
}
}
+ // delete the temporary directory in output directory
+ try {
+ if (fileSys.exists(tmpDir)) {
+ FileUtil.fullyDelete(fileSys, tmpDir);
+ }
+ } catch (IOException e) {
+ LOG.error("Exception in deleting " + tmpDir.toString());
+ }
+
this.status.setRunState(JobStatus.SUCCEEDED);
JobEndNotifier.localRunnerNotification(job, status);
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
Thu Feb 14 05:41:01 2008
@@ -42,7 +42,12 @@
String name, Progressable progress)
throws IOException {
- Path file = new Path(job.getOutputPath(), name);
+ Path outputPath = job.getOutputPath();
+ FileSystem fs = outputPath.getFileSystem(job);
+ if (!fs.exists(outputPath)) {
+ throw new IOException("Output directory doesnt exist");
+ }
+ Path file = new Path(outputPath, name);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
@@ -58,7 +63,7 @@
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
- new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
+ new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
compressionType, codec,
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
Thu Feb 14 05:41:01 2008
@@ -40,8 +40,12 @@
String name, Progressable progress)
throws IOException {
- Path file = new Path(job.getOutputPath(), name);
- FileSystem fs = file.getFileSystem(job);
+ Path outputPath = job.getOutputPath();
+ FileSystem fs = outputPath.getFileSystem(job);
+ if (!fs.exists(outputPath)) {
+ throw new IOException("Output directory doesnt exist");
+ }
+ Path file = new Path(outputPath, name);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Feb 14
05:41:01 2008
@@ -190,7 +190,8 @@
public String toString() { return taskId; }
private Path getTaskOutputPath(JobConf conf) {
- Path p = new Path(conf.getOutputPath(), ("_" + taskId));
+ Path p = new Path(conf.getOutputPath(), ("_temporary"
+ + Path.SEPARATOR + "_" + taskId));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
@@ -420,7 +421,7 @@
if (taskOutputPath != null) {
FileSystem fs = taskOutputPath.getFileSystem(conf);
if (fs.exists(taskOutputPath)) {
- Path jobOutputPath = taskOutputPath.getParent();
+ Path jobOutputPath = taskOutputPath.getParent().getParent();
// Move the task outputs to their final place
moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu
Feb 14 05:41:01 2008
@@ -1423,7 +1423,25 @@
localJobConf.set("mapred.task.id", task.getTaskId());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+
+ // create _taskid directory in output path temporary directory.
+ Path outputPath = localJobConf.getOutputPath();
+ if (outputPath != null) {
+ Path jobTmpDir = new Path(outputPath, "_temporary");
+ FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
+ if (fs.exists(jobTmpDir)) {
+ Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskId());
+ if (!fs.mkdirs(taskTmpDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + taskTmpDir.toString());
+ }
+ } else {
+ throw new IOException("The directory " + jobTmpDir.toString()
+ + " doesnt exist ");
+ }
+ }
task.localizeConfiguration(localJobConf);
+
OutputStream out = localFs.create(localTaskFile);
try {
localJobConf.write(out);
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
Thu Feb 14 05:41:01 2008
@@ -108,6 +108,9 @@
Path dir = job.getOutputPath();
FileSystem fs = dir.getFileSystem(job);
+ if (!fs.exists(dir)) {
+ throw new IOException("Output directory doesnt exist");
+ }
boolean isCompressed = getCompressOutput(job);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
Thu Feb 14 05:41:01 2008
@@ -44,6 +44,10 @@
public void testFormat() throws Exception {
JobConf job = new JobConf();
job.setOutputPath(workDir);
+ FileSystem fs = workDir.getFileSystem(job);
+ if (!fs.mkdirs(workDir)) {
+ fail("Failed to create output directory");
+ }
String file = "test.txt";
// A reporter that does nothing