Author: cutting
Date: Tue Jun 12 19:46:29 2012
New Revision: 1349492
URL: http://svn.apache.org/viewvc?rev=1349492&view=rev
Log:
AVRO-1106. Java: Add AvroMultipleOutputs for newer mapreduce API. Contributed
by Ashish Nagavaram.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
(with props)
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
(with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1349492&r1=1349491&r2=1349492&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jun 12 19:46:29 2012
@@ -4,6 +4,9 @@ Avro 1.7.1 (unreleased)
NEW FEATURES
+ AVRO-1106. Java: Add AvroMultipleOutputs for newer mapreduce API.
+ (Ashish Nagavaram via cutting)
+
IMPROVEMENTS
BUG FIXES
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1349492&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
(added)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
Tue Jun 12 19:46:29 2012
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.List;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.avro.Schema;
+
+/**
+ * The AvroMultipleOutputs class simplifies writing Avro output data
+ * to multiple outputs
+ *
+ * <p>
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>Schema</code> and <code>OutputFormat</code>.
+ * </p>
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ *
+ * <p>
+ * AvroMultipleOutputs supports counters, by default they are disabled. The
+ * counters group is the {@link AvroMultipleOutputs} class name. The names of
the
+ * counters are the same as the output name. These count the number of records
+ * written to each output name.
+ * </p>
+ *
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MyAvroMapper.class);
+ * job.setReducerClass(MyAvroReducer.class);
+ * ...
+ *
+ * Schema schema;
+ * ...
+ * // Defines additional single output 'avro1' for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro1",
AvroKeyValueOutputFormat.class,
+ * keyschema, valueSchema); // valueSchema can be set to null if there only
Key to be written
+ to file in the RecordWriter
+ *
+ * // Defines additional output 'avro2' with different schema for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro2",
+ * AvroKeyOutputFormat.class,
+ * schema,null);
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ *
+ * public class MyAvroReducer extends
+ * Reducer<K, V, T, NullWritable> {
+ * private MultipleOutputs amos;
+ *
+ *
+ * public void setup(Context context) {
+ * ...
+ * amos = new AvroMultipleOutputs(context);
+ * }
+ *
+ * public void reduce(K, Iterator<V> values,Context context)
+ * throws IOException {
+ * ...
+ * amos.write("avro1",datum,NullWritable.get());
+ * amos.write("avro2",datum,NullWritable.get());
+ * amos.getCollector("avro3",datum); // here the value is taken as NullWritable
+ * ...
+ * }
+ *
+ * public void cleanup(Context context) throws IOException {
+ * amos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+
+
+public class AvroMultipleOutputs{
+
+ private static final String MULTIPLE_OUTPUTS =
"avro.mapreduce.multipleoutputs";
+
+ private static final String MO_PREFIX =
+ "avro.mapreduce.multipleoutputs.namedOutput.";
+
+ private static final String FORMAT = ".format";
+ private static final String COUNTERS_ENABLED =
+ "avro.mapreduce.multipleoutputs.counters";
+
+ /**
+ * Counters group used by the counters of MultipleOutputs.
+ */
+ private static final String COUNTERS_GROUP =
AvroMultipleOutputs.class.getName();
+
+ /**
+ * Cache for the taskContexts
+ */
+ private Map<String, TaskAttemptContext> taskContexts = new HashMap<String,
TaskAttemptContext>();
+
+ /**
+ * Cache for the Key Schemas
+ */
+ private static Map<String, Schema> keySchemas = new HashMap<String,
Schema>();
+
+ /**
+ * Cache for the Value Schemas
+ */
+ private static Map<String, Schema> valSchemas = new HashMap<String,
Schema>();
+
+ /**
+ * Checks if a named output name is valid token.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkTokenName(String namedOutput) {
+ if (namedOutput == null || namedOutput.length() == 0) {
+ throw new IllegalArgumentException(
+ "Name cannot be NULL or emtpy");
+ }
+ for (char ch : namedOutput.toCharArray()) {
+ if ((ch >= 'A') && (ch <= 'Z')) {
+ continue;
+ }
+ if ((ch >= 'a') && (ch <= 'z')) {
+ continue;
+ }
+ if ((ch >= '0') && (ch <= '9')) {
+ continue;
+ }
+ throw new IllegalArgumentException(
+ "Name cannot be have a '" + ch + "' char");
+ }
+ }
+
+ /**
+ * Checks if output name is valid.
+ *
+ * name cannot be the name used for the default output
+ * @param outputPath base output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkBaseOutputPath(String outputPath) {
+ if (outputPath.equals("part")) {
+ throw new IllegalArgumentException("output name cannot be 'part'");
+ }
+ }
+
+ /**
+ * Checks if a named output name is valid.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkNamedOutputName(JobContext job,
+ String namedOutput, boolean alreadyDefined) {
+ checkTokenName(namedOutput);
+ checkBaseOutputPath(namedOutput);
+ List<String> definedChannels = getNamedOutputsList(job);
+ if (alreadyDefined && definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' already alreadyDefined");
+ } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' not defined");
+ }
+ }
+
+ // Returns list of channel names.
+ private static List<String> getNamedOutputsList(JobContext job) {
+ List<String> names = new ArrayList<String>();
+ StringTokenizer st = new StringTokenizer(
+ job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+ while (st.hasMoreTokens()) {
+ names.add(st.nextToken());
+ }
+ return names;
+ }
+
+ // Returns the named output OutputFormat.
+ @SuppressWarnings("unchecked")
+ private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
+ JobContext job, String namedOutput) {
+ return (Class<? extends OutputFormat<?, ?>>)
+ job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
+ OutputFormat.class);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param job job to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param keySchema Schema for the Key
+ */
+ @SuppressWarnings("unchecked")
+ public static void addNamedOutput(Job job, String namedOutput,
+ Class<? extends OutputFormat> outputFormatClass,
+ Schema keySchema) {
+ addNamedOutput(job,namedOutput,outputFormatClass,keySchema,null);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param job job to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param keySchema Schema for the Key
+ * @param valueSchema Schema for the Value (used in case of
AvroKeyValueOutputFormat or null)
+ */
+ @SuppressWarnings("unchecked")
+ public static void addNamedOutput(Job job, String namedOutput,
+ Class<? extends OutputFormat> outputFormatClass,
+ Schema keySchema, Schema valueSchema) {
+ checkNamedOutputName(job, namedOutput, true);
+ Configuration conf = job.getConfiguration();
+ conf.set(MULTIPLE_OUTPUTS,
+ conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+ conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+ OutputFormat.class);
+ keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
+ valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
+
+ }
+
+ /**
+ * Enables or disables counters for the named outputs.
+ *
+ * The counters group is the {@link MultipleOutputs} class name.
+ * The names of the counters are the same as the named outputs. These
+ * counters count the number records written to each output name.
+ * By default these counters are disabled.
+ *
+ * @param job job to enable counters
+ * @param enabled indicates if the counters will be enabled or not.
+ */
+ public static void setCountersEnabled(Job job, boolean enabled) {
+ job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
+ }
+
+ /**
+ * Returns if the counters for the named outputs are enabled or not.
+ * By default these counters are disabled.
+ *
+ * @param job the job
+ * @return TRUE if the counters are enabled, FALSE if they are disabled.
+ */
+ public static boolean getCountersEnabled(JobContext job) {
+ return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
+ }
+
+ /**
+ * Wraps RecordWriter to increment counters.
+ */
+ @SuppressWarnings("unchecked")
+ private static class RecordWriterWithCounter extends RecordWriter {
+ private RecordWriter writer;
+ private String counterName;
+ private TaskInputOutputContext context;
+
+ public RecordWriterWithCounter(RecordWriter writer, String counterName,
+ TaskInputOutputContext context) {
+ this.writer = writer;
+ this.counterName = counterName;
+ this.context = context;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public void write(Object key, Object value)
+ throws IOException, InterruptedException {
+ context.getCounter(COUNTERS_GROUP, counterName).increment(1);
+ writer.write(key, value);
+ }
+
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ writer.close(context);
+ }
+ }
+
+ // instance code, to be used from Mapper/Reducer code
+
+ private TaskInputOutputContext<?, ?, ?, ?> context;
+ private Set<String> namedOutputs;
+ private Map<String, RecordWriter<?, ?>> recordWriters;
+ private boolean countersEnabled;
+
+ /**
+ * Creates and initializes multiple outputs support,
+ * it should be instantiated in the Mapper/Reducer setup method.
+ *
+ * @param context the TaskInputOutputContext object
+ */
+ public AvroMultipleOutputs(
+ TaskInputOutputContext<?, ?, ?, ?> context) {
+ this.context = context;
+ namedOutputs = Collections.unmodifiableSet(
+ new HashSet<String>(AvroMultipleOutputs.getNamedOutputsList(context)));
+ recordWriters = new HashMap<String, RecordWriter<?, ?>>();
+ countersEnabled = getCountersEnabled(context);
+ }
+
+ /**
+ * Write key and value to the namedOutput.
+ *
+ * Output path is a unique file generated for the namedOutput.
+ * For example, {namedOutput}-(m|r)-{part-number}
+ *
+ * @param namedOutput the named output name
+ * @param key the key , value is NullWritable
+ */
+ @SuppressWarnings("unchecked")
+ public void write(String namedOutput, Object key)
+ throws IOException, InterruptedException {
+ write(namedOutput, key, NullWritable.get(), namedOutput);
+ }
+
+
+
+ /**
+ * Write key and value to the namedOutput.
+ *
+ * Output path is a unique file generated for the namedOutput.
+ * For example, {namedOutput}-(m|r)-{part-number}
+ *
+ * @param namedOutput the named output name
+ * @param key the key
+ * @param value the value
+ */
+ @SuppressWarnings("unchecked")
+ public void write(String namedOutput, Object key, Object value)
+ throws IOException, InterruptedException {
+ write(namedOutput, key, value, namedOutput);
+ }
+
+ /**
+ * Write key and value to baseOutputPath using the namedOutput.
+ *
+ * @param namedOutput the named output name
+ * @param key the key
+ * @param value the value
+ * @param baseOutputPath base-output path to write the record to.
+ * Note: Framework will generate unique filename for the baseOutputPath
+ */
+ @SuppressWarnings("unchecked")
+ public void write(String namedOutput, Object key, Object value,
+ String baseOutputPath) throws IOException, InterruptedException {
+ checkNamedOutputName(context, namedOutput, false);
+ checkBaseOutputPath(baseOutputPath);
+ if (!namedOutputs.contains(namedOutput)) {
+ throw new IllegalArgumentException("Undefined named output '" +
+ namedOutput + "'");
+ }
+ TaskAttemptContext taskContext = getContext(namedOutput);
+ getRecordWriter(taskContext, baseOutputPath).write(key, value);
+ }
+
+ /**
+ * Write key value to an output file name.
+ *
+ * Gets the record writer from job's output format.
+ * Job's output format should be a FileOutputFormat.
+ *
+ * @param key the key
+ * @param value the value
+ * @param baseOutputPath base-output path to write the record to.
+ * Note: Framework will generate unique filename for the baseOutputPath
+ */
+ @SuppressWarnings("unchecked")
+ public void write(Object key, Object value, String baseOutputPath)
+ throws IOException, InterruptedException {
+ checkBaseOutputPath(baseOutputPath);
+ TaskAttemptContext taskContext = new TaskAttemptContext(
+ context.getConfiguration(), context.getTaskAttemptID());
+ getRecordWriter(taskContext, baseOutputPath).write(key, value);
+ }
+
+ // by being synchronized MultipleOutputTask can be use with a
+ // MultithreadedMapper.
+ @SuppressWarnings("unchecked")
+ private synchronized RecordWriter getRecordWriter(
+ TaskAttemptContext taskContext, String baseFileName)
+ throws IOException, InterruptedException {
+
+ // look for record-writer in the cache
+ RecordWriter writer = recordWriters.get(baseFileName);
+
+ // If not in cache, create a new one
+ if (writer == null) {
+ // get the record writer from context output format
+ //FileOutputFormat.setOutputName(taskContext, baseFileName);
+ try {
+ writer = ((OutputFormat) ReflectionUtils.newInstance(
+ taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+ .getRecordWriter(taskContext);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ // if counters are enabled, wrap the writer with context
+ // to increment counters
+ if (countersEnabled) {
+ writer = new RecordWriterWithCounter(writer, baseFileName, context);
+ }
+
+ // add the record-writer to the cache
+ recordWriters.put(baseFileName, writer);
+ }
+ return writer;
+ }
+
+ // Create a taskAttemptContext for the named output with
+ // output format and output key/value types put in the context
+ private TaskAttemptContext getContext(String nameOutput) throws IOException {
+
+ TaskAttemptContext taskContext = taskContexts.get(nameOutput);
+
+ if (taskContext != null) {
+ return taskContext;
+ }
+
+ // The following trick leverages the instantiation of a record writer via
+ // the job thus supporting arbitrary output formats.
+ context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
+ Job job = new Job(context.getConfiguration());
+ job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
+ Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
+ Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
+
+ boolean isMaponly=job.getNumReduceTasks() == 0;
+
+ if(keySchema!=null)
+ {
+ if(isMaponly)
+ AvroJob.setMapOutputKeySchema(job,keySchema);
+ else
+ AvroJob.setOutputKeySchema(job,keySchema);
+ }
+ if(valSchema!=null)
+ {
+ if(isMaponly)
+ AvroJob.setMapOutputValueSchema(job,valSchema);
+ else
+ AvroJob.setOutputValueSchema(job,valSchema);
+ }
+ taskContext = new TaskAttemptContext(
+ job.getConfiguration(), context.getTaskAttemptID());
+
+ taskContexts.put(nameOutput, taskContext);
+
+ return taskContext;
+ }
+
+ /**
+ * Closes all the opened outputs.
+ *
+ * This should be called from cleanup method of map/reduce task.
+ * If overridden subclasses must invoke <code>super.close()</code> at the
+ * end of their <code>close()</code>
+ *
+ */
+ @SuppressWarnings("unchecked")
+ public void close() throws IOException, InterruptedException {
+ for (RecordWriter writer : recordWriters.values()) {
+ writer.close(context);
+ }
+ }
+}
+
+
Propchange:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1349492&r1=1349491&r2=1349492&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
Tue Jun 12 19:46:29 2012
@@ -20,6 +20,7 @@ package org.apache.avro.mapreduce;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.avro.file.CodecFactory;
import org.apache.hadoop.fs.Path;
@@ -59,7 +60,10 @@ public abstract class AvroOutputFormatBa
* @return The target output stream.
*/
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context)
throws IOException {
- Path path = getDefaultWorkFile(context,
org.apache.avro.mapred.AvroOutputFormat.EXT);
+ Path path = new
Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
+
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}
+
+
}
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java?rev=1349492&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
(added)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
Tue Jun 12 19:46:29 2012
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.apache.avro.mapred.Pair;
+
+public class TestAvroMultipleOutputs {
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ public static final Schema STATS_SCHEMA =
+ Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+ + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"}]}");
+ public static final Schema STATS_SCHEMA_2 =
+ Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+ + "\"fields\":[{\"name\":\"count1\",\"type\":\"int\"},"
+ + "{\"name\":\"name1\",\"type\":\"string\"}]}");
+
+ private static class LineCountMapper extends Mapper<LongWritable, Text,
Text, IntWritable> {
+ private IntWritable mOne;
+
+ @Override
+ protected void setup(Context context) {
+ mOne = new IntWritable(1);
+ }
+
+ @Override
+ protected void map(LongWritable fileByteOffset, Text line, Context context)
+ throws IOException, InterruptedException {
+ context.write(line, mOne);
+ }
+ }
+
+ private static class StatCountMapper
+ extends Mapper<AvroKey<TextStats>, NullWritable, Text, IntWritable> {
+ private IntWritable mCount;
+ private Text mText;
+
+ @Override
+ protected void setup(Context context) {
+ mCount = new IntWritable(0);
+ mText = new Text("");
+ }
+
+ @Override
+ protected void map(AvroKey<TextStats> record, NullWritable ignore, Context
context)
+ throws IOException, InterruptedException {
+ mCount.set(record.datum().count);
+ mText.set(record.datum().name.toString());
+ context.write(mText, mCount);
+ }
+ }
+
+ private static class GenericStatsReducer
+ extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>,
NullWritable> {
+ private AvroKey<GenericData.Record> mStats;
+ private AvroMultipleOutputs amos;
+
+ @Override
+ protected void setup(Context context) {
+ mStats = new AvroKey<GenericData.Record>(null);
+ amos = new AvroMultipleOutputs(context);
+ }
+
+ @Override
+ protected void reduce(Text line, Iterable<IntWritable> counts, Context
context)
+ throws IOException, InterruptedException {
+ GenericData.Record record = new GenericData.Record(STATS_SCHEMA);
+ GenericData.Record record2 = new GenericData.Record(STATS_SCHEMA_2);
+ int sum = 0;
+ for (IntWritable count : counts) {
+ sum += count.get();
+ }
+ record.put("name", new Utf8(line.toString()));
+ record.put("count", new Integer(sum));
+ mStats.datum(record);
+ context.write(mStats, NullWritable.get());
+ amos.write("myavro",mStats,NullWritable.get());
+ record2.put("name1", new Utf8(line.toString()));
+ record2.put("count1", new Integer(sum));
+ mStats.datum(record2);
+ amos.write("myavro1",mStats);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws
IOException,InterruptedException
+ {
+ amos.close();
+ }
+ }
+
+ private static class SpecificStatsReducer
+ extends Reducer<Text, IntWritable, AvroKey<TextStats>, NullWritable> {
+ private AvroKey<TextStats> mStats;
+ private AvroMultipleOutputs amos;
+ @Override
+ protected void setup(Context context) {
+ mStats = new AvroKey<TextStats>(null);
+ amos = new AvroMultipleOutputs(context);
+ }
+
+ @Override
+ protected void reduce(Text line, Iterable<IntWritable> counts, Context
context)
+ throws IOException, InterruptedException {
+ TextStats record = new TextStats();
+ record.count = 0;
+ for (IntWritable count : counts) {
+ record.count += count.get();
+ }
+ record.name = line.toString();
+ mStats.datum(record);
+ context.write(mStats, NullWritable.get());
+ amos.write("myavro3",mStats,NullWritable.get());
+ }
+ @Override
+ protected void cleanup(Context context) throws
IOException,InterruptedException
+ {
+ amos.close();
+ }
+ }
+
+ private static class SortMapper
+ extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+ @Override
+ protected void map(AvroKey<TextStats> key, NullWritable value, Context
context)
+ throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ private static class SortReducer
+ extends Reducer<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+ @Override
+ protected void reduce(AvroKey<TextStats> key, Iterable<NullWritable>
ignore, Context context)
+ throws IOException, InterruptedException {
+ context.write(key, NullWritable.get());
+ }
+ }
+
+ @Test
+ public void testAvroGenericOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+ .toURI().toString()));
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(LineCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(GenericStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, STATS_SCHEMA);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroKeyOutputFormat.class,STATS_SCHEMA,null);
+ AvroMultipleOutputs.addNamedOutput(job,"myavro1",
AvroKeyOutputFormat.class, STATS_SCHEMA_2);
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+ outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<GenericData.Record> reader = new
DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer)
record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro1-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name1")).toString(), (Integer)
record.get("count1"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
+ public void testAvroSpecificOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+ .toURI().toString()));
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(LineCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+ job.setReducerClass(SpecificStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out-specific");
+ outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
+ public void testAvroInput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+
.getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+ job.setMapperClass(StatCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(SpecificStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() +
"/out-specific-input");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
+ public void testAvroMapOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+
.getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setMapperClass(SortMapper.class);
+ AvroJob.setMapOutputKeySchema(job, TextStats.SCHEMA$);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setReducerClass(SortReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() +
"/out-specific-input");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/part-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+}
Propchange:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
------------------------------------------------------------------------------
svn:eol-style = native