Author: omalley Date: Fri Aug 10 13:25:50 2007 New Revision: 564751 URL: http://svn.apache.org/viewvc?view=rev&rev=564751 Log: Merge -r 564740:564744 from trunk to 0.14 branch. Fixes HADOOP-71.
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Fri Aug 10 13:25:50 2007 @@ -463,6 +463,9 @@ 148. HADOOP-1680. Improvements to Block CRC upgrade messages. (Raghu Angadi via dhruba) +149. HADOOP-71. Allow Text and SequenceFile Map/Reduce inputs from non-default + filesystems. (omalley) + Release 0.13.0 - 2007-06-08 1. HADOOP-1047. Fix TestReplication to succeed more reliably. Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Aug 10 13:25:50 2007 @@ -69,7 +69,7 @@ final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split - FileSystem fs = FileSystem.get(job); + FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); InputStream in = fileIn; boolean skipFirstLine = false; Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java Fri Aug 10 13:25:50 2007 @@ -210,14 +210,11 @@ private Reporter reporter = null; - private JobConf job; - public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, JobConf job, Reporter reporter) throws IOException { - this.job = job; this.reporter = reporter; String finalName = getOutputName(getPartition()); - FileSystem fs = FileSystem.get(this.job); + FileSystem fs = FileSystem.get(job); out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); } Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Fri Aug 10 13:25:50 2007 @@ -20,10 +20,11 @@ import java.io.IOException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.*; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; /** An [EMAIL PROTECTED] RecordReader} for [EMAIL PROTECTED] SequenceFile}s. */ @@ -36,8 +37,9 @@ public SequenceFileRecordReader(Configuration conf, FileSplit split) throws IOException { - FileSystem fs = FileSystem.get(conf); - this.in = new SequenceFile.Reader(fs, split.getPath(), conf); + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + this.in = new SequenceFile.Reader(fs, path, conf); this.end = split.getStart() + split.getLength(); this.conf = conf; Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java Fri Aug 10 13:25:50 2007 @@ -180,7 +180,8 @@ private Path getTaskOutputPath(JobConf conf) { Path p = new Path(conf.getOutputPath(), ("_" + taskId)); try { - return p.makeQualified(FileSystem.get(conf)); + FileSystem fs = p.getFileSystem(conf); + return p.makeQualified(fs); } catch (IOException ie) { LOG.warn(StringUtils.stringifyException(ie)); return p; @@ -390,21 +391,23 @@ * @throws IOException */ void saveTaskOutput() throws IOException { - FileSystem fs = FileSystem.get(conf); - if (taskOutputPath != null && fs.exists(taskOutputPath)) { - Path jobOutputPath = taskOutputPath.getParent(); - - // Move the task outputs to their final place - moveTaskOutputs(fs, jobOutputPath, taskOutputPath); - - // Delete the temporary task-specific output directory - if (!fs.delete(taskOutputPath)) { - LOG.info("Failed to delete the temporary output directory of task: " + - getTaskId() + " - " + taskOutputPath); + if (taskOutputPath != null) { + FileSystem fs = taskOutputPath.getFileSystem(conf); + if (fs.exists(taskOutputPath)) { + Path jobOutputPath = taskOutputPath.getParent(); + + // Move the task outputs to their final place + moveTaskOutputs(fs, jobOutputPath, taskOutputPath); + + // Delete the temporary task-specific output directory + if (!fs.delete(taskOutputPath)) { + LOG.info("Failed to delete the temporary output directory of task: " + + getTaskId() + " - " + taskOutputPath); + } + + LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath); } - - LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath); } } @@ -439,13 +442,14 @@ * @throws IOException */ void discardTaskOutput() throws IOException { - FileSystem fs = FileSystem.get(conf); - - if (taskOutputPath != null && fs.exists(taskOutputPath)) { - // Delete the temporary task-specific output directory - FileUtil.fullyDelete(fs, taskOutputPath); - LOG.info("Discarded output of task '" + getTaskId() + "' - " - + taskOutputPath); + if (taskOutputPath != null) { + FileSystem fs = taskOutputPath.getFileSystem(conf); + if (fs.exists(taskOutputPath)) { + // Delete the temporary task-specific output directory + FileUtil.fullyDelete(fs, taskOutputPath); + LOG.info("Discarded output of task '" + getTaskId() + "' - " + + taskOutputPath); + } } } Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=564751&r1=564750&r2=564751 ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Aug 10 13:25:50 2007 @@ -51,18 +51,19 @@ } } public static TestResult launchWordCount(JobConf conf, + Path inDir, + Path outDir, String input, int numMaps, int numReduces) throws IOException { - final Path inDir = new Path("/testing/wc/input"); - final Path outDir = new Path("/testing/wc/output"); - FileSystem fs = FileSystem.get(conf); - fs.delete(outDir); - if (!fs.mkdirs(inDir)) { + FileSystem inFs = inDir.getFileSystem(conf); + FileSystem outFs = outDir.getFileSystem(conf); + outFs.delete(outDir); + if (!inFs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } { - DataOutputStream file = fs.create(new Path(inDir, "part-0")); + DataOutputStream file = inFs.create(new Path(inDir, "part-0")); file.writeBytes(input); file.close(); } @@ -177,7 +178,9 @@ // Keeping tasks that match this pattern jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*"); TestResult result; - result = launchWordCount(jobConf, + final Path inDir = new Path("/testing/wc/input"); + final Path outDir = new Path("/testing/wc/output"); + result = launchWordCount(jobConf, inDir, outDir, "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1); @@ -188,8 +191,25 @@ checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid}); // test with maps=0 jobConf = mr.createJobConf(); - result = launchWordCount(jobConf, "owen is oom", 0, 1); + result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1); assertEquals("is\t1\noom\t1\nowen\t1\n", result.output); + // Run a job with input and output going to localfs even though the + // default fs is hdfs. + { + FileSystem localfs = FileSystem.getLocal(jobConf); + String TEST_ROOT_DIR = + new File(System.getProperty("test.build.data","/tmp")) + .toString().replace(' ', '+'); + Path localIn = localfs.makeQualified + (new Path(TEST_ROOT_DIR + "/local/in")); + Path localOut = localfs.makeQualified + (new Path(TEST_ROOT_DIR + "/local/out")); + result = launchWordCount(jobConf, localIn, localOut, + "all your base belong to us", 1, 1); + assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", + result.output); + assertTrue("outputs on localfs", localfs.exists(localOut)); + } } finally { if (fileSys != null) { fileSys.close(); } if (dfs != null) { dfs.shutdown(); }