Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java Mon Dec 15 14:21:32 2008 @@ -35,6 +35,8 @@ private long start; private long end; private boolean more = true; + private K key = null; + private V value = null; protected Configuration conf; @Override @@ -58,23 +60,30 @@ @Override @SuppressWarnings("unchecked") - public K nextKey(K key) throws IOException, InterruptedException { + public boolean nextKeyValue() throws IOException, InterruptedException { if (!more) { - return null; + return false; } long pos = in.getPosition(); - K result = (K) in.next(key); - if (result == null || (pos >= end && in.syncSeen())) { + key = (K) in.next(key); + if (key == null || (pos >= end && in.syncSeen())) { more = false; - result = null; + key = null; + value = null; + } else { + value = (V) in.getCurrentValue(value); } - return result; + return more; } @Override - @SuppressWarnings("unchecked") - public V nextValue(V value) throws IOException, InterruptedException { - return (V) in.getCurrentValue(value); + public K getCurrentKey() { + return key; + } + + @Override + public V getCurrentValue() { + return value; } /**
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java Mon Dec 15 14:21:32 2008 @@ -26,9 +26,10 @@ public class InverseMapper<K, V> extends Mapper<K,V,V,K> { /** The inverse function. Input keys and values are swapped.*/ + @Override public void map(K key, V value, Context context ) throws IOException, InterruptedException { - context.collect(value, key); + context.write(value, key); } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Mon Dec 15 14:21:32 2008 @@ -23,10 +23,16 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Multithreaded implementation for @link org.apache.hadoop.mapreduce.Mapper. @@ -50,7 +56,7 @@ private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class); private Class<Mapper<K1,V1,K2,V2>> mapClass; private Context outer; - private MapRunner[] runners; + private List<MapRunner> runners; public static int getNumberOfThreads(Configuration conf) { return conf.getInt("mapred.map.multithreadedrunner.threads", 10); @@ -78,6 +84,7 @@ conf.setClass("mapred.map.multithreadedrunner.class", cls, Mapper.class); } + @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); outer = context; @@ -88,14 +95,16 @@ " threads"); } - runners = (MapRunner[]) new Object[numberOfThreads]; + runners = new ArrayList<MapRunner>(numberOfThreads); for(int i=0; i < numberOfThreads; ++i) { - runners[i] = new MapRunner(); - runners[i].start(); + MapRunner thread = new MapRunner(context); + thread.start(); + runners.set(i, thread); } for(int i=0; i < numberOfThreads; ++i) { - runners[i].join(); - Throwable th = runners[i].throwable; + MapRunner thread = runners.get(i); + thread.join(); + Throwable th = thread.throwable; if (th != null) { if (th instanceof IOException) { throw (IOException) th; @@ -108,85 +117,116 @@ } } - private class SubMapContext extends Context { + private class SubMapRecordReader extends RecordReader<K1,V1> { private K1 key; private V1 value; - - SubMapContext() { - super(outer.getConfiguration(), outer.getTaskAttemptId()); + private Configuration conf; + + @Override + public void close() throws IOException { } @Override - public InputSplit getInputSplit() { - synchronized (outer) { - return outer.getInputSplit(); - } + public float getProgress() throws IOException, InterruptedException { + return 0; } @Override - public Counter getCounter(Enum<?> counterName) { - synchronized (outer) { - return outer.getCounter(counterName); - } + public void initialize(InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException { + conf = context.getConfiguration(); } + @Override - public Counter getCounter(String groupName, String counterName) { + public boolean nextKeyValue() throws IOException, InterruptedException { synchronized (outer) { - return outer.getCounter(groupName, counterName); + if (!outer.nextKeyValue()) { + return false; + } + key = ReflectionUtils.copy(outer.getConfiguration(), + outer.getCurrentKey(), key); + value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); + return true; } } + public K1 getCurrentKey() { + return key; + } + @Override - public void progress() { - synchronized (outer) { - outer.progress(); - } + public V1 getCurrentValue() { + return value; } + } + + private class SubMapRecordWriter extends RecordWriter<K2,V2> { @Override - public void collect(K2 key, V2 value) throws IOException, - InterruptedException { - synchronized (outer) { - outer.collect(key, value); - } + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { } @Override - public K1 nextKey(K1 k) throws IOException, InterruptedException { + public void write(K2 key, V2 value) throws IOException, + InterruptedException { synchronized (outer) { - key = outer.nextKey(key); - if (key != null) { - value = outer.nextValue(value); - } - return key; + outer.write(key, value); } + } + } + + private class SubMapStatusReporter extends StatusReporter { + + @Override + public Counter getCounter(Enum<?> name) { + return outer.getCounter(name); } - - public V1 nextValue(V1 v) throws IOException, InterruptedException { - return value; + + @Override + public Counter getCounter(String group, String name) { + return outer.getCounter(group, name); } + + @Override + public void progress() { + outer.progress(); + } + + @Override + public void setStatus(String status) { + outer.setStatus(status); + } + } private class MapRunner extends Thread { private Mapper<K1,V1,K2,V2> mapper; - private Context context; + private Context subcontext; private Throwable throwable; - @SuppressWarnings("unchecked") - MapRunner() { - mapper = (Mapper<K1,V1,K2,V2>) - ReflectionUtils.newInstance(mapClass, context.getConfiguration()); - context = new SubMapContext(); + MapRunner(Context context) throws IOException, InterruptedException { + mapper = ReflectionUtils.newInstance(mapClass, + context.getConfiguration()); + subcontext = new Context(outer.getConfiguration(), + outer.getTaskAttemptID(), + new SubMapRecordReader(), + new SubMapRecordWriter(), + context.getOutputCommitter(), + new SubMapStatusReporter(), + outer.getInputSplit()); } public Throwable getThrowable() { return throwable; } + @Override public void run() { try { - mapper.run(context); + mapper.run(subcontext); } catch (Throwable ie) { throwable = ie; } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java Mon Dec 15 14:21:32 2008 @@ -29,13 +29,14 @@ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); - + + @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); - context.collect(word, one); + context.write(word, one); } } } Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=726850&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Mon Dec 15 14:21:32 2008 @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.StringUtils; + +/** An {...@link OutputCommitter} that commits files specified + * in job output directory i.e. ${mapred.output.dir}. + **/ +public class FileOutputCommitter extends OutputCommitter { + + private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); + + /** + * Temporary directory name + */ + protected static final String TEMP_DIR_NAME = "_temporary"; + private FileSystem outputFileSystem = null; + private Path outputPath = null; + private Path workPath = null; + + public FileOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + if (outputPath != null) { + this.outputPath = outputPath; + outputFileSystem = outputPath.getFileSystem(context.getConfiguration()); + workPath = new Path(outputPath, + (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + + "_" + context.getTaskAttemptID().toString() + )).makeQualified(outputFileSystem); + } + } + + public void setupJob(JobContext context) throws IOException { + if (outputPath != null) { + Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); + FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); + if (!fileSys.mkdirs(tmpDir)) { + LOG.error("Mkdirs failed to create " + tmpDir.toString()); + } + } + } + + public void cleanupJob(JobContext context) throws IOException { + if (outputPath != null) { + Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); + FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); + if (fileSys.exists(tmpDir)) { + fileSys.delete(tmpDir, true); + } + } + } + + public void setupTask(TaskAttemptContext context) throws IOException { + // FileOutputCommitter's setupTask doesn't do anything. Because the + // temporary task directory is created on demand when the + // task is writing. + } + + public void commitTask(TaskAttemptContext context) + throws IOException { + TaskAttemptID attemptId = context.getTaskAttemptID(); + if (workPath != null) { + context.progress(); + if (outputFileSystem.exists(workPath)) { + // Move the task outputs to their final place + moveTaskOutputs(context, outputFileSystem, outputPath, workPath); + // Delete the temporary task-specific output directory + if (!outputFileSystem.delete(workPath, true)) { + LOG.warn("Failed to delete the temporary output" + + " directory of task: " + attemptId + " - " + workPath); + } + LOG.info("Saved output of task '" + attemptId + "' to " + + outputPath); + } + } + } + + private void moveTaskOutputs(TaskAttemptContext context, + FileSystem fs, + Path jobOutputDir, + Path taskOutput) + throws IOException { + TaskAttemptID attemptId = context.getTaskAttemptID(); + context.progress(); + if (fs.isFile(taskOutput)) { + Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, + workPath); + if (!fs.rename(taskOutput, finalOutputPath)) { + if (!fs.delete(finalOutputPath, true)) { + throw new IOException("Failed to delete earlier output of task: " + + attemptId); + } + if (!fs.rename(taskOutput, finalOutputPath)) { + throw new IOException("Failed to save output of task: " + + attemptId); + } + } + LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); + } else if(fs.getFileStatus(taskOutput).isDir()) { + FileStatus[] paths = fs.listStatus(taskOutput); + Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath); + fs.mkdirs(finalOutputPath); + if (paths != null) { + for (FileStatus path : paths) { + moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); + } + } + } + } + + public void abortTask(TaskAttemptContext context) { + try { + context.progress(); + outputFileSystem.delete(workPath, true); + } catch (IOException ie) { + LOG.warn("Error discarding output" + StringUtils.stringifyException(ie)); + } + } + + private Path getFinalPath(Path jobOutputDir, Path taskOutput, + Path taskOutputPath) throws IOException { + URI taskOutputUri = taskOutput.toUri(); + URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); + if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput + throw new IOException("Can not get the relative path: base = " + + taskOutputPath + " child = " + taskOutput); + } + if (relativePath.getPath().length() > 0) { + return new Path(jobOutputDir, relativePath.getPath()); + } else { + return jobOutputDir; + } + } + + public boolean needsTaskCommit(TaskAttemptContext context + ) throws IOException { + return workPath != null && outputFileSystem.exists(workPath); + } + + /** + * Get the directory that the task should write results into + * @return the work directory + * @throws IOException + */ + public Path getWorkPath() throws IOException { + return workPath; + } +} Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Mon Dec 15 14:21:32 2008 @@ -27,22 +27,34 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** A base class for {...@link OutputFormat}s that read from {...@link FileSystem}s.*/ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> { - private static final String TEMP_DIR_NAME = "_temp"; + /** Construct output file names so that, when an output directory listing is + * sorted lexicographically, positions correspond to output partitions.*/ + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + private FileOutputCommitter committer = null; + /** * Set whether the output of the job is compressed. - * @param conf the {...@link Configuration} to modify + * @param job the job to modify * @param compress should the output of the job be compressed? */ - public static void setCompressOutput(Configuration conf, boolean compress) { - conf.setBoolean("mapred.output.compress", compress); + public static void setCompressOutput(Job job, boolean compress) { + job.getConfiguration().setBoolean("mapred.output.compress", compress); } /** @@ -57,16 +69,17 @@ /** * Set the {...@link CompressionCodec} to be used to compress job outputs. - * @param conf the {...@link Configuration} to modify + * @param job the job to modify * @param codecClass the {...@link CompressionCodec} to be used to * compress the job outputs */ public static void - setOutputCompressorClass(Configuration conf, + setOutputCompressorClass(Job job, Class<? extends CompressionCodec> codecClass) { - setCompressOutput(conf, true); - conf.setClass("mapred.output.compression.codec", codecClass, - CompressionCodec.class); + setCompressOutput(job, true); + job.getConfiguration().setClass("mapred.output.compression.codec", + codecClass, + CompressionCodec.class); } /** @@ -95,20 +108,19 @@ return codecClass; } - public abstract - RecordWriter<K, V> getRecordWriter(TaskAttemptContext context - ) throws IOException; - - public void checkOutputSpecs(JobContext context) - throws FileAlreadyExistsException, - InvalidJobConfException, IOException { + public abstract RecordWriter<K, V> + getRecordWriter(TaskAttemptContext context + ) throws IOException, InterruptedException; + + public void checkOutputSpecs(JobContext context + ) throws FileAlreadyExistsException, IOException{ // Ensure that the output directory is set and not already there Configuration job = context.getConfiguration(); Path outDir = getOutputPath(job); - if (outDir == null && context.getNumReduceTasks() != 0) { - throw new InvalidJobConfException("Output directory not set in JobConf."); + if (outDir == null) { + throw new InvalidJobConfException("Output directory not set."); } - if (outDir != null && outDir.getFileSystem(job).exists(outDir)) { + if (outDir.getFileSystem(job).exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } @@ -117,19 +129,19 @@ /** * Set the {...@link Path} of the output directory for the map-reduce job. * - * @param conf The configuration of the job. + * @param job The job to modify * @param outputDir the {...@link Path} of the output directory for * the map-reduce job. */ - public static void setOutputPath(Configuration conf, Path outputDir) { - conf.set("mapred.output.dir", outputDir.toString()); + public static void setOutputPath(Job job, Path outputDir) { + job.getConfiguration().set("mapred.output.dir", outputDir.toString()); } /** * Get the {...@link Path} to the output directory for the map-reduce job. * * @return the {...@link Path} to the output directory for the map-reduce job. - * @see FileOutputFormat#getWorkOutputPath(Configuration) + * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext) */ public static Path getOutputPath(Configuration conf) { String name = conf.get("mapred.output.dir"); @@ -162,18 +174,12 @@ * is completely transparent to the application.</p> * * <p>The application-writer can take advantage of this by creating any - * side-files required in <tt>${mapred.work.output.dir}</tt> during execution - * of his reduce-task i.e. via {...@link #getWorkOutputPath(Configuration)}, and + * side-files required in a work directory during execution + * of his task i.e. via + * {...@link #getWorkOutputPath(TaskInputOutputContext)}, and * the framework will move them out similarly - thus she doesn't have to pick * unique paths per task-attempt.</p> * - * <p><i>Note</i>: the value of <tt>${mapred.work.output.dir}</tt> during - * execution of a particular task-attempt is actually - * <tt>${mapred.output.dir}/_temporary/_{$taskid}</tt>, and this value is - * set by the map-reduce framework. So, just create any side-files in the - * path returned by {...@link #getWorkOutputPath(Configuration)} from map/reduce - * task to take advantage of this feature.</p> - * * <p>The entire discussion holds true for maps of jobs with * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, * goes directly to HDFS.</p> @@ -181,77 +187,12 @@ * @return the {...@link Path} to the task's temporary output directory * for the map-reduce job. */ - public static Path getWorkOutputPath(Configuration conf) { - String name = conf.get("mapred.work.output.dir"); - return name == null ? null: new Path(name); - } - - /** - * Helper function to create the task's temporary output directory and - * return the path to the task's output file. - * - * @param context the task's context - * @return path to the task's temporary output file - * @throws IOException - */ - protected static Path getTaskOutputPath(TaskAttemptContext context - ) throws IOException { - // ${mapred.job.dir} - Configuration conf = context.getConfiguration(); - Path outputPath = getOutputPath(conf); - if (outputPath == null) { - throw new IOException("Undefined job output-path"); - } - - // ${mapred.out.dir}/_temporary - Path jobTmpDir = new Path(outputPath, TEMP_DIR_NAME); - FileSystem fs = jobTmpDir.getFileSystem(conf); - if (!fs.exists(jobTmpDir)) { - throw new IOException("The temporary job-output directory " + - jobTmpDir.toString() + " doesn't exist!"); - } - - // ${mapred.out.dir}/_temporary/_${taskid} - Path taskTmpDir = getWorkOutputPath(conf); - if (!fs.mkdirs(taskTmpDir)) { - throw new IOException("Mkdirs failed to create " - + taskTmpDir.toString()); - } - - // ${mapred.out.dir}/_temporary/_${taskid}/${name} - return new Path(taskTmpDir, getOutputName(context)); - } - - /** - * Helper function to generate a name that is unique for the task. - * - * <p>The generated name can be used to create custom files from within the - * different tasks for the job, the names for different tasks will not collide - * with each other.</p> - * - * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for - * reduces and the task partition number. For example, give a name 'test' - * running on the first map o the job the generated name will be - * 'test-m-00000'.</p> - * - * @param conf the configuration for the job. - * @param name the name to make unique. - * @return a unique name accross all tasks of the job. - */ - public static String getUniqueName(Configuration conf, String name) { - int partition = conf.getInt("mapred.task.partition", -1); - if (partition == -1) { - throw new IllegalArgumentException( - "This method can only be called from within a Job"); - } - - String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r"; - - NumberFormat numberFormat = NumberFormat.getInstance(); - numberFormat.setMinimumIntegerDigits(5); - numberFormat.setGroupingUsed(false); - - return name + "-" + taskType + "-" + numberFormat.format(partition); + public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context + ) throws IOException, + InterruptedException { + FileOutputCommitter committer = (FileOutputCommitter) + context.getOutputCommitter(); + return committer.getWorkPath(); } /** @@ -262,28 +203,68 @@ * reduce tasks. The path name will be unique for each task. The path parent * will be the job output directory.</p>ls * - * <p>This method uses the {...@link #getUniqueName} method to make the file name + * <p>This method uses the {...@link #getUniqueFile} method to make the file name * unique for the task.</p> * - * @param conf the configuration for the job. + * @param context the context for the task. * @param name the name for the file. + * @param extension the extension for the file * @return a unique path accross all tasks of the job. */ - public static Path getPathForCustomFile(Configuration conf, String name) { - return new Path(getWorkOutputPath(conf), getUniqueName(conf, name)); + public + static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, + String name, + String extension + ) throws IOException, InterruptedException { + return new Path(getWorkOutputPath(context), + getUniqueFile(context, name, extension)); } - /** Construct output file names so that, when an output directory listing is - * sorted lexicographically, positions correspond to output partitions.*/ - private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); - static { - NUMBER_FORMAT.setMinimumIntegerDigits(5); - NUMBER_FORMAT.setGroupingUsed(false); + /** + * Generate a unique filename, based on the task id, name, and extension + * @param context the task that is calling this + * @param name the base filename + * @param extension the filename extension + * @return a string like $name-[mr]-$id$extension + */ + public synchronized static String getUniqueFile(TaskAttemptContext context, + String name, + String extension) { + TaskID taskId = context.getTaskAttemptID().getTaskID(); + int partition = taskId.getId(); + StringBuilder result = new StringBuilder(); + result.append(name); + result.append('-'); + result.append(taskId.isMap() ? 'm' : 'r'); + result.append('-'); + result.append(NUMBER_FORMAT.format(partition)); + result.append(extension); + return result.toString(); } - protected static synchronized - String getOutputName(TaskAttemptContext context) { - return "part-" + NUMBER_FORMAT.format(context.getTaskAttemptId().getId()); + /** + * Get the default path and filename for the output format. + * @param context the task context + * @param extension an extension to add to the filename + * @return a full path $output/_temporary/$taskid/part-[mr]-$id + * @throws IOException + */ + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException{ + FileOutputCommitter committer = + (FileOutputCommitter) getOutputCommitter(context); + return new Path(committer.getWorkPath(), getUniqueFile(context, "part", + extension)); + } + + public synchronized + OutputCommitter getOutputCommitter(TaskAttemptContext context + ) throws IOException { + if (committer == null) { + Path output = getOutputPath(context.getConfiguration()); + committer = new FileOutputCommitter(output, context); + } + return committer; } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Mon Dec 15 14:21:32 2008 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.lib.output; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -28,12 +29,29 @@ */ public class NullOutputFormat<K, V> extends OutputFormat<K, V> { - public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) { + @Override + public RecordWriter<K, V> + getRecordWriter(TaskAttemptContext context) { return new RecordWriter<K, V>(){ public void write(K key, V value) { } public void close(TaskAttemptContext context) { } }; } + @Override public void checkOutputSpecs(JobContext context) { } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new OutputCommitter() { + public void abortTask(TaskAttemptContext taskContext) { } + public void cleanupJob(JobContext jobContext) { } + public void commitTask(TaskAttemptContext taskContext) { } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void setupJob(JobContext jobContext) { } + public void setupTask(TaskAttemptContext taskContext) { } + }; + } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Mon Dec 15 14:21:32 2008 @@ -27,6 +27,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -36,13 +37,11 @@ /** An {...@link OutputFormat} that writes {...@link SequenceFile}s. */ public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> { - public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) - throws IOException { - // get the path of the temporary output file - Path file = FileOutputFormat.getTaskOutputPath(context); + public RecordWriter<K, V> + getRecordWriter(TaskAttemptContext context + ) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - FileSystem fs = file.getFileSystem(conf); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(conf)) { @@ -54,6 +53,9 @@ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } + // get the path of the temporary output file + Path file = getDefaultWorkFile(context, ""); + FileSystem fs = file.getFileSystem(conf); final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, file, context.getOutputKeyClass(), @@ -90,14 +92,15 @@ /** * Set the {...@link CompressionType} for the output {...@link SequenceFile}. - * @param conf the {...@link Configuration} to modify + * @param job the {...@link Job} to modify * @param style the {...@link CompressionType} for the output * {...@link SequenceFile} */ - public static void setOutputCompressionType(Configuration conf, + public static void setOutputCompressionType(Job job, CompressionType style) { - setCompressOutput(conf, true); - conf.set("mapred.output.compression.type", style.toString()); + setCompressOutput(job, true); + job.getConfiguration().set("mapred.output.compression.type", + style.toString()); } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Mon Dec 15 14:21:32 2008 @@ -38,9 +38,8 @@ /** An {...@link OutputFormat} that writes plain text files. */ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { - protected static class LineRecordWriter<K, V> - implements RecordWriter<K, V> { + extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { @@ -108,26 +107,26 @@ } } - public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) - throws IOException { + public RecordWriter<K, V> + getRecordWriter(TaskAttemptContext context + ) throws IOException, InterruptedException { Configuration job = context.getConfiguration(); boolean isCompressed = getCompressOutput(job); - String keyValueSeparator = job.get("mapred.textoutputformat.separator", - "\t"); - Path file = FileOutputFormat.getTaskOutputPath(context); + String keyValueSeparator= job.get("mapred.textoutputformat.separator","\t"); + CompressionCodec codec = null; + String extension = ""; + if (isCompressed) { + Class<? extends CompressionCodec> codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(context, extension); + FileSystem fs = file.getFileSystem(job); if (!isCompressed) { - FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, context); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { - Class<? extends CompressionCodec> codecClass = - getOutputCompressorClass(job, GzipCodec.class); - // create the named codec - CompressionCodec codec = (CompressionCodec) - ReflectionUtils.newInstance(codecClass, job); - // build the filename including the extension - file = new Path(file + codec.getDefaultExtension()); - FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, context); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java Mon Dec 15 14:21:32 2008 @@ -34,7 +34,7 @@ sum += val.get(); } result.set(sum); - context.collect(key, result); + context.write(key, result); } } \ No newline at end of file Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java Mon Dec 15 14:21:32 2008 @@ -34,7 +34,7 @@ sum += val.get(); } result.set(sum); - context.collect(key, result); + context.write(key, result); } } \ No newline at end of file Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java Mon Dec 15 14:21:32 2008 @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.examples.WordCount; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Mon Dec 15 14:21:32 2008 @@ -37,9 +37,9 @@ JobConf job = new JobConf(); job.set("mapred.task.id", attempt); job.setOutputCommitter(FileOutputCommitter.class); - JobContext jContext = new JobContext(job); - TaskAttemptContext tContext = new TaskAttemptContext(job, taskID); FileOutputFormat.setOutputPath(job, outDir); + JobContext jContext = new JobContext(job, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContext(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, committer.getTempTaskOutputPath(tContext)); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Mon Dec 15 14:21:32 2008 @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.examples.WordCount; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java Mon Dec 15 14:21:32 2008 @@ -28,7 +28,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.examples.WordCount; /** Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Dec 15 14:21:32 2008 @@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.examples.WordCount; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java Mon Dec 15 14:21:32 2008 @@ -27,7 +27,6 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.WordCount; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Mon Dec 15 14:21:32 2008 @@ -108,7 +108,7 @@ for (TaskCompletionEvent tce : taskComplEvents) { String[] diagnostics = - jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId()); + rj.getTaskDiagnostics(tce.getTaskAttemptId()); if (diagnostics != null) { for (String str : diagnostics) { @@ -304,7 +304,7 @@ .getTaskStatus() == TaskCompletionEvent.Status.FAILED); String[] diagnostics = - jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId()); + rj.getTaskDiagnostics(tce.getTaskAttemptId()); // Every task HAS to spit out the out-of-memory errors assert (diagnostics != null); Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java?rev=726850&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java Mon Dec 15 14:21:32 2008 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * This is an example Hadoop Map/Reduce application. + * It reads the text input files, breaks each line into words + * and counts them. The output is a locally sorted list of words and the + * count of how often they occurred. + * + * To run: bin/hadoop jar build/hadoop-examples.jar wordcount + * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> + */ +public class WordCount extends Configured implements Tool { + + /** + * Counts the words in each line. + * For each line of input, break the line into words and emit them as + * (<b>word</b>, <b>1</b>). + */ + public static class MapClass extends MapReduceBase + implements Mapper<LongWritable, Text, Text, IntWritable> { + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(LongWritable key, Text value, + OutputCollector<Text, IntWritable> output, + Reporter reporter) throws IOException { + String line = value.toString(); + StringTokenizer itr = new StringTokenizer(line); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + output.collect(word, one); + } + } + } + + /** + * A reducer class that just emits the sum of the input values. + */ + public static class Reduce extends MapReduceBase + implements Reducer<Text, IntWritable, Text, IntWritable> { + + public void reduce(Text key, Iterator<IntWritable> values, + OutputCollector<Text, IntWritable> output, + Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + output.collect(key, new IntWritable(sum)); + } + } + + static int printUsage() { + System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>"); + ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + /** + * The main driver for word count map/reduce program. + * Invoke this method to submit the map/reduce job. + * @throws IOException When there is communication problems with the + * job tracker. + */ + public int run(String[] args) throws Exception { + JobConf conf = new JobConf(getConf(), WordCount.class); + conf.setJobName("wordcount"); + + // the keys are words (strings) + conf.setOutputKeyClass(Text.class); + // the values are counts (ints) + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(MapClass.class); + conf.setCombinerClass(Reduce.class); + conf.setReducerClass(Reduce.class); + + List<String> other_args = new ArrayList<String>(); + for(int i=0; i < args.length; ++i) { + try { + if ("-m".equals(args[i])) { + conf.setNumMapTasks(Integer.parseInt(args[++i])); + } else if ("-r".equals(args[i])) { + conf.setNumReduceTasks(Integer.parseInt(args[++i])); + } else { + other_args.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i-1]); + return printUsage(); + } + } + // Make sure there are exactly 2 parameters left. + if (other_args.size() != 2) { + System.out.println("ERROR: Wrong number of parameters: " + + other_args.size() + " instead of 2."); + return printUsage(); + } + FileInputFormat.setInputPaths(conf, other_args.get(0)); + FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); + + JobClient.runJob(conf); + return 0; + } + + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WordCount(), args); + System.exit(res); + } + +} Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=726850&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Mon Dec 15 14:21:32 2008 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.examples.WordCount.IntSumReducer; +import org.apache.hadoop.examples.WordCount.TokenizerMapper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * A JUnit test to test min map-reduce cluster with local file system. + */ +public class TestMapReduceLocal extends TestCase { + private static Path TEST_ROOT_DIR = + new Path(System.getProperty("test.build.data","/tmp")); + private static Configuration conf = new Configuration(); + private static FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(conf); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + public Path writeFile(String name, String data) throws IOException { + Path file = new Path(TEST_ROOT_DIR + "/" + name); + localFs.delete(file, false); + DataOutputStream f = localFs.create(file); + f.write(data.getBytes()); + f.close(); + return file; + } + + public String readFile(String name) throws IOException { + DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name)); + BufferedReader b = new BufferedReader(new InputStreamReader(f)); + StringBuilder result = new StringBuilder(); + String line = b.readLine(); + while (line != null) { + result.append(line); + result.append('\n'); + line = b.readLine(); + } + return result.toString(); + } + + public void testWithLocal() throws Exception { + MiniMRCluster mr = null; + try { + mr = new MiniMRCluster(2, "file:///", 3); + Configuration conf = mr.createJobConf(); + writeFile("in/part1", "this is a test\nof word count\n"); + writeFile("in/part2", "more test"); + Job job = new Job(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in")); + FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out")); + assertTrue(job.waitForCompletion()); + String out = readFile("out/part-r-00000"); + System.out.println(out); + assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n", + out); + } finally { + if (mr != null) { mr.shutdown(); } + } + } + +} Modified: hadoop/core/trunk/src/test/testjar/ClassWordCount.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/testjar/ClassWordCount.java?rev=726850&r1=726849&r2=726850&view=diff ============================================================================== --- hadoop/core/trunk/src/test/testjar/ClassWordCount.java (original) +++ hadoop/core/trunk/src/test/testjar/ClassWordCount.java Mon Dec 15 14:21:32 2008 @@ -34,7 +34,7 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.mapred.WordCount; /** * This is an example Hadoop Map/Reduce application being used for
