Author: ddas
Date: Thu Jul 10 05:27:05 2008
New Revision: 675546
URL: http://svn.apache.org/viewvc?rev=675546&view=rev
Log:
HADOOP-3149. Adds a way in which map/reducetasks can create multiple outputs.
Contributed by Alejandro Abdelnur.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=675546&r1=675545&r2=675546&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jul 10 05:27:05 2008
@@ -45,6 +45,9 @@
naming convention,such as, hadoop.rm.queue.queue-name.property-name.
(Hemanth Yamijala via ddas)
+ HADOOP-3149. Adds a way in which map/reducetasks can create multiple
+ outputs. (Alejandro Abdelnur via ddas)
+
IMPROVEMENTS
HADOOP-3577. Tools to inject blocks into name node and simulated
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=675546&r1=675545&r2=675546&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
Thu Jul 10 05:27:05 2008
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.text.NumberFormat;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -230,5 +231,56 @@
// ${mapred.out.dir}/_temporary/_${taskid}/${name}
return new Path(taskTmpDir, name);
}
+
+ /**
+ * Helper function to generate a name that is unique for the task.
+ *
+ * <p>The generated name can be used to create custom files from within the
+ * different tasks for the job, the names for different tasks will not
collide
+ * with each other.</p>
+ *
+ * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
+ * reduces and the task partition number. For example, give a name 'test'
+ * running on the first map o the job the generated name will be
+ * 'test-m-00000'.</p>
+ *
+ * @param conf the configuration for the job.
+ * @param name the name to make unique.
+ * @return a unique name accross all tasks of the job.
+ */
+ public static String getUniqueName(JobConf conf, String name) {
+ int partition = conf.getInt("mapred.task.partition", -1);
+ if (partition == -1) {
+ throw new IllegalArgumentException(
+ "This method can only be called from within a Job");
+ }
+
+ String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" :
"r";
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+
+ return name + "-" + taskType + "-" + numberFormat.format(partition);
+ }
+
+ /**
+ * Helper function to generate a [EMAIL PROTECTED] Path} for a file that is
unique for
+ * the task within the job output directory.
+ *
+ * <p>The path can be used to create custom files from within the map and
+ * reduce tasks. The path name will be unique for each task. The path parent
+ * will be the job output directory.</p>ls
+ *
+ * <p>This method uses the [EMAIL PROTECTED] #getUniqueName} method to make
the file name
+ * unique for the task.</p>
+ *
+ * @param conf the configuration for the job.
+ * @param name the name for the file.
+ * @return a unique path accross all tasks of the job.
+ */
+ public static Path getPathForCustomFile(JobConf conf, String name) {
+ return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
+ }
}
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java?rev=675546&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
Thu Jul 10 05:27:05 2008
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The MultipleOutputs class simplifies writting to additional outputs other
+ * than the job default output via the <code>OutputCollector</code> passed to
+ * the <code>map()</code> and <code>reduce()</code> methods of the
+ * <code>Mapper</code> and <code>Reducer</code> implementations.
+ * <p/>
+ * Each additional output, or named output, may be configured with its own
+ * <code>OutputFormat</code>, with its own key class and with its own value
+ * class.
+ * <p/>
+ * A named output can be a single file or a multi file. The later is refered as
+ * a multi named output.
+ * <p/>
+ * A multi named output is an unbound set of files all sharing the same
+ * <code>OutputFormat</code>, key class and value class configuration.
+ * <p/>
+ * When named outputs are used within a <code>Mapper</code> implementation,
+ * key/values written to a name output are not part of the reduce phase, only
+ * key/values written to the job <code>OutputCollector</code> are part of the
+ * reduce phase.
+ * <p/>
+ * Job configuration usage pattern is:
+ * <pre>
+ *
+ * JobConf conf = new JobConf();
+ *
+ * conf.setInputPath(inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ *
+ * conf.setMapperClass(MOMap.class);
+ * conf.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional multi sequencefile based output 'sequence' for the
+ * // job
+ * MultipleOutputs.addMultiNamedOutput(conf, "seq",
+ * SequenceFileOutputFormat.class,
+ * LongWritable.class, Text.class);
+ * ...
+ *
+ * JobClient jc = new JobClient();
+ * RunningJob job = jc.submitJob(conf);
+ *
+ * ...
+ * </pre>
+ * <p/>
+ * Job configuration usage pattern is:
+ * <pre>
+ *
+ * public class MOReduce implements
+ * Reducer<WritableComparable, Writable> {
+ * private MultipleOutputs mos;
+ *
+ * public void configure(JobConf conf) {
+ * ...
+ * mos = new MultipleOutputs(conf);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator<Writable> values,
+ * OutputCollector output, Reporter reporter)
+ * throws IOException {
+ * ...
+ * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
+ * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
+ * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
+ * ...
+ * }
+ *
+ * public void close() throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+public class MultipleOutputs {
+
+ private static final String NAMED_OUTPUTS = "mo.namedOutputs";
+
+ private static final String MO_PREFIX = "mo.namedOutput.";
+
+ private static final String FORMAT = ".format";
+ private static final String KEY = ".key";
+ private static final String VALUE = ".value";
+ private static final String MULTI = ".multi";
+
+ /**
+ * Checks if a named output is alreadyDefined or not.
+ *
+ * @param conf job conf
+ * @param namedOutput named output names
+ * @param alreadyDefined whether the existence/non-existence of
+ * the named output is to be checked
+ * @throws IllegalArgumentException if the output name is alreadyDefined or
+ * not depending on the value of the
+ * 'alreadyDefined' parameter
+ */
+ private static void checkNamedOutput(JobConf conf, String namedOutput,
+ boolean alreadyDefined) {
+ List<String> definedChannels = getNamedOutputsList(conf);
+ if (alreadyDefined && definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' already alreadyDefined");
+ } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' not defined");
+ }
+ }
+
+ /**
+ * Checks if a named output name is valid token.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkTokenName(String namedOutput) {
+ if (namedOutput == null || namedOutput.length() == 0) {
+ throw new IllegalArgumentException(
+ "Name cannot be NULL or emtpy");
+ }
+ for (char ch : namedOutput.toCharArray()) {
+ if ((ch >= 'A') && (ch <= 'Z')) {
+ continue;
+ }
+ if ((ch >= 'a') && (ch <= 'z')) {
+ continue;
+ }
+ if ((ch >= '0') && (ch <= '9')) {
+ continue;
+ }
+ throw new IllegalArgumentException(
+ "Name cannot be have a '" + ch + "' char");
+ }
+ }
+
+ /**
+ * Checks if a named output name is valid.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkNamedOutputName(String namedOutput) {
+ checkTokenName(namedOutput);
+ // name cannot be the name used for the default output
+ if (namedOutput.equals("part")) {
+ throw new IllegalArgumentException(
+ "Named output name cannot be 'part'");
+ }
+ }
+
+ /**
+ * Returns list of channel names.
+ *
+ * @param conf job conf
+ * @return List of channel Names
+ */
+ public static List<String> getNamedOutputsList(JobConf conf) {
+ List<String> names = new ArrayList<String>();
+ StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
+ while (st.hasMoreTokens()) {
+ names.add(st.nextToken());
+ }
+ return names;
+ }
+
+
+ /**
+ * Returns if a named output is multiple.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return <code>true</code> if the name output is multi, <code>false</code>
+ * if it is single. If the name output is not defined it returns
+ * <code>false</code>
+ */
+ public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
+ }
+
+ /**
+ * Returns the named output OutputFormat.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return namedOutput OutputFormat
+ */
+ public static Class<? extends OutputFormat> getNamedOutputFormatClass(
+ JobConf conf, String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
+ OutputFormat.class);
+ }
+
+ /**
+ * Returns the key class for a named output.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return class for the named output key
+ */
+ public static Class<?> getNamedOutputKeyClass(JobConf conf,
+ String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
+ Object.class);
+ }
+
+ /**
+ * Returns the value class for a named output.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return class of named output value
+ */
+ public static Class<?> getNamedOutputValueClass(JobConf conf,
+ String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
+ Object.class);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param keyClass key class
+ * @param valueClass value class
+ */
+ public static void addNamedOutput(JobConf conf, String namedOutput,
+ Class<? extends OutputFormat>
outputFormatClass,
+ Class<?> keyClass, Class<?> valueClass) {
+ addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
+ valueClass);
+ }
+
+ /**
+ * Adds a multi named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param keyClass key class
+ * @param valueClass value class
+ */
+ public static void addMultiNamedOutput(JobConf conf, String namedOutput,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<?> keyClass, Class<?> valueClass) {
+ addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
+ valueClass);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param multi indicates if the named output is multi
+ * @param outputFormatClass OutputFormat class.
+ * @param keyClass key class
+ * @param valueClass value class
+ */
+ private static void addNamedOutput(JobConf conf, String namedOutput,
+ boolean multi,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<?> keyClass, Class<?> valueClass) {
+ checkNamedOutputName(namedOutput);
+ checkNamedOutput(conf, namedOutput, true);
+ conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
+ conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+ OutputFormat.class);
+ conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+ conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+ conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
+ }
+
+ // instance code, to be used from Mapper/Reducer code
+
+ private JobConf conf;
+ private OutputFormat outputFormat;
+ private Set<String> namedOutputs;
+ private Map<String, RecordWriter> recordWriters;
+
+ /**
+ * Creates and initializes multiple named outputs support, it should be
+ * instantiated in the Mapper/Reducer configure method.
+ *
+ * @param job the job configuration object
+ */
+ public MultipleOutputs(JobConf job) {
+ this.conf = job;
+ outputFormat = new InternalFileOutputFormat();
+ namedOutputs = Collections.unmodifiableSet(
+ new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
+ recordWriters = new HashMap<String, RecordWriter>();
+ }
+
+ /**
+ * Returns iterator with the defined name outputs.
+ *
+ * @return iterator with the defined named outputs
+ */
+ public Iterator<String> getNamedOutputs() {
+ return namedOutputs.iterator();
+ }
+
+
+ // by being synchronized MultipleOutputTask can be use with a
+ // MultithreaderMapRunner.
+ private synchronized RecordWriter getRecordWriter(String namedOutput,
+ String baseFileName,
+ Reporter reporter)
+ throws IOException {
+ RecordWriter writer = recordWriters.get(baseFileName);
+ if (writer == null) {
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
+ FileSystem fs = FileSystem.get(conf);
+ writer =
+ outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+ recordWriters.put(baseFileName, writer);
+ }
+ return writer;
+ }
+
+ /**
+ * Gets the output collector for a named output.
+ * <p/>
+ *
+ * @param namedOutput the named output name
+ * @param reporter the reporter
+ * @return the output collector for the given named output
+ * @throws IOException thrown if output collector could not be created
+ */
+ @SuppressWarnings({"unchecked"})
+ public OutputCollector getCollector(String namedOutput, Reporter reporter)
+ throws IOException {
+ return getCollector(namedOutput, null, reporter);
+ }
+
+ /**
+ * Gets the output collector for a multi named output.
+ * <p/>
+ *
+ * @param namedOutput the named output name
+ * @param multiName the multi name part
+ * @param reporter the reporter
+ * @return the output collector for the given named output
+ * @throws IOException thrown if output collector could not be created
+ */
+ @SuppressWarnings({"unchecked"})
+ public OutputCollector getCollector(String namedOutput, String multiName,
+ Reporter reporter)
+ throws IOException {
+
+ checkNamedOutputName(namedOutput);
+ if (!namedOutputs.contains(namedOutput)) {
+ throw new IllegalArgumentException("Undefined named output '" +
+ namedOutput + "'");
+ }
+ boolean multi = isMultiNamedOutput(conf, namedOutput);
+
+ if (!multi && multiName != null) {
+ throw new IllegalArgumentException("Name output '" + namedOutput +
+ "' has not been defined as multi");
+ }
+ if (multi) {
+ checkTokenName(multiName);
+ }
+
+ String baseFileName = (multi) ? namedOutput + "_" + multiName :
namedOutput;
+
+ final RecordWriter writer =
+ getRecordWriter(namedOutput, baseFileName, reporter);
+
+ return new OutputCollector() {
+
+ @SuppressWarnings({"unchecked"})
+ public void collect(Object key, Object value) throws IOException {
+ writer.write(key, value);
+ }
+
+ };
+ }
+
+ /**
+ * Closes all the opened named outputs.
+ * <p/>
+ * If overriden subclasses must invoke <code>super.close()</code> at the
+ * end of their <code>close()</code>
+ *
+ * @throws java.io.IOException thrown if any of the MultipleOutput files
+ * could not be closed properly.
+ */
+ public void close() throws IOException {
+ for (RecordWriter writer : recordWriters.values()) {
+ writer.close(null);
+ }
+ }
+
+ private static class InternalFileOutputFormat extends
+ FileOutputFormat<Object, Object> {
+
+ public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
+
+ @SuppressWarnings({"unchecked"})
+ public RecordWriter<Object, Object> getRecordWriter(
+ FileSystem fs, JobConf job, String baseFileName, Progressable progress)
+ throws IOException {
+
+ String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
+ String fileName = getUniqueName(job, baseFileName);
+
+ // The following trick leverages the instantiation of a record writer via
+ // the job conf thus supporting arbitrary output formats.
+ JobConf outputConf = new JobConf(job);
+ outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
+ outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
+ outputConf.setOutputValueClass(getNamedOutputValueClass(job,
nameOutput));
+ OutputFormat outputFormat = outputConf.getOutputFormat();
+ return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
+ }
+ }
+
+}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java?rev=675546&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java
Thu Jul 10 05:27:05 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestFileOutputFormat extends HadoopTestCase {
+
+ public TestFileOutputFormat() throws IOException {
+ super(HadoopTestCase.CLUSTER_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ public void testCustomFile() throws Exception {
+ Path inDir = new Path("testing/fileoutputformat/input");
+ Path outDir = new Path("testing/fileoutputformat/output");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ inDir = new Path(localPathRoot, inDir);
+ outDir = new Path(localPathRoot, outDir);
+ }
+
+
+ JobConf conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ file = fs.create(new Path(inDir, "part-1"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ conf.setJobName("fof");
+ conf.setInputFormat(TextInputFormat.class);
+
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setMapOutputKeyClass(LongWritable.class);
+ conf.setMapOutputValueClass(Text.class);
+
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setMapperClass(TestMap.class);
+ conf.setReducerClass(TestReduce.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+
+ JobClient jc = new JobClient(conf);
+ RunningJob job = jc.submitJob(conf);
+ while (!job.isComplete()) {
+ Thread.sleep(100);
+ }
+ assertTrue(job.isSuccessful());
+
+ boolean map0 = false;
+ boolean map1 = false;
+ boolean reduce = false;
+ FileStatus[] statuses = fs.listStatus(outDir);
+ for (FileStatus status : statuses) {
+ map0 = map0 || status.getPath().getName().equals("test-m-00000");
+ map1 = map1 || status.getPath().getName().equals("test-m-00001");
+ reduce = reduce || status.getPath().getName().equals("test-r-00000");
+ }
+
+ assertTrue(map0);
+ assertTrue(map1);
+ assertTrue(reduce);
+ }
+
+ public static class TestMap implements Mapper<LongWritable, Text,
+ LongWritable, Text> {
+
+ public void configure(JobConf conf) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ OutputStream os =
+ fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
+ os.write(1);
+ os.close();
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter) throws IOException {
+ output.collect(key, value);
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ public static class TestReduce implements Reducer<LongWritable, Text,
+ LongWritable, Text> {
+
+ public void configure(JobConf conf) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ OutputStream os =
+ fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
+ os.write(1);
+ os.close();
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public void reduce(LongWritable key, Iterator<Text> values,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter) throws IOException {
+ while (values.hasNext()) {
+ Text value = values.next();
+ output.collect(key, value);
+ }
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java?rev=675546&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
Thu Jul 10 05:27:05 2008
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+public class TestMultipleOutputs extends HadoopTestCase {
+
+ public TestMultipleOutputs() throws IOException {
+ super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public void testMultipleOutputs() throws Exception {
+ Path inDir = new Path("testing/mo/input");
+ Path outDir = new Path("testing/mo/output");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ inDir = new Path(localPathRoot, inDir);
+ outDir = new Path(localPathRoot, outDir);
+ }
+
+
+ JobConf conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ file = fs.create(new Path(inDir, "part-1"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ conf.setJobName("mo");
+ conf.setInputFormat(TextInputFormat.class);
+
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setMapOutputKeyClass(LongWritable.class);
+ conf.setMapOutputValueClass(Text.class);
+
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
+ LongWritable.class, Text.class);
+ MultipleOutputs.addMultiNamedOutput(conf, "sequence",
+ SequenceFileOutputFormat.class, LongWritable.class, Text.class);
+
+ conf.setMapperClass(MOMap.class);
+ conf.setReducerClass(MOReduce.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+
+ JobClient jc = new JobClient(conf);
+ RunningJob job = jc.submitJob(conf);
+ while (!job.isComplete()) {
+ Thread.sleep(100);
+ }
+
+ // assert number of named output part files
+ int namedOutputCount = 0;
+ FileStatus[] statuses = fs.listStatus(outDir);
+ for (FileStatus status : statuses) {
+ if (status.getPath().getName().equals("text-m-00000") ||
+ status.getPath().getName().equals("text-m-00001") ||
+ status.getPath().getName().equals("text-r-00000") ||
+ status.getPath().getName().equals("sequence_A-m-00000") ||
+ status.getPath().getName().equals("sequence_A-m-00001") ||
+ status.getPath().getName().equals("sequence_B-m-00000") ||
+ status.getPath().getName().equals("sequence_B-m-00001") ||
+ status.getPath().getName().equals("sequence_B-r-00000") ||
+ status.getPath().getName().equals("sequence_C-r-00000")) {
+ namedOutputCount++;
+ }
+ }
+ assertEquals(9, namedOutputCount);
+
+ // assert TextOutputFormat files correctness
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(fs.open(
+ new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
+ int count = 0;
+ String line = reader.readLine();
+ while (line != null) {
+ assertTrue(line.endsWith("text"));
+ line = reader.readLine();
+ count++;
+ }
+ reader.close();
+ assertFalse(count == 0);
+
+ // assert SequenceOutputFormat files correctness
+ SequenceFile.Reader seqReader =
+ new SequenceFile.Reader(fs, new
Path(FileOutputFormat.getOutputPath(conf),
+ "sequence_B-r-00000"), conf);
+
+ assertEquals(LongWritable.class, seqReader.getKeyClass());
+ assertEquals(Text.class, seqReader.getValueClass());
+
+ count = 0;
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ while (seqReader.next(key, value)) {
+ assertEquals("sequence", value.toString());
+ count++;
+ }
+ reader.close();
+ assertFalse(count == 0);
+
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
+ Text> {
+
+ private MultipleOutputs mos;
+
+ public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
+ }
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter)
+ throws IOException {
+ if (!value.toString().equals("a")) {
+ output.collect(key, value);
+ } else {
+ mos.getCollector("text", reporter).collect(key, new Text("text"));
+ mos.getCollector("sequence", "A", reporter).collect(key,
+ new Text("sequence"));
+ mos.getCollector("sequence", "B", reporter).collect(key,
+ new Text("sequence"));
+ }
+ }
+
+ public void close() throws IOException {
+ mos.close();
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public static class MOReduce implements Reducer<LongWritable, Text,
+ LongWritable, Text> {
+
+ private MultipleOutputs mos;
+
+ public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
+ }
+
+ public void reduce(LongWritable key, Iterator<Text> values,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ Text value = values.next();
+ if (!value.toString().equals("b")) {
+ output.collect(key, value);
+ } else {
+ mos.getCollector("text", reporter).collect(key, new Text("text"));
+ mos.getCollector("sequence", "B", reporter).collect(key,
+ new Text("sequence"));
+ mos.getCollector("sequence", "C", reporter).collect(key,
+ new Text("sequence"));
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ mos.close();
+ }
+ }
+
+}