Author: cutting
Date: Thu Apr 1 18:30:36 2010
New Revision: 930060
URL: http://svn.apache.org/viewvc?rev=930060&view=rev
Log:
AVRO-493. Add support for Hadoop MapReduce with Avro data files.
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
hadoop/avro/trunk/share/test/schemas/WordCount.avsc
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/java/build.xml
hadoop/avro/trunk/lang/java/ivy.xml
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Apr 1 18:30:36 2010
@@ -6,6 +6,8 @@ Avro 1.4.0 (unreleased)
NEW FEATURES
+ AVRO-493. Add support for Hadoop Mapreduce with Avro data files. (cutting)
+
IMPROVEMENTS
BUG FIXES
Modified: hadoop/avro/trunk/lang/java/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/build.xml?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/build.xml (original)
+++ hadoop/avro/trunk/lang/java/build.xml Thu Apr 1 18:30:36 2010
@@ -66,6 +66,8 @@
value="http://jackson.codehaus.org/0.9.3/javadoc/"/>
<property name="javadoc.link.servlet"
value="http://java.sun.com/products/servlet/2.3/javadoc/"/>
+ <property name="javadoc.link.hadoop"
+ value="http://hadoop.apache.org/common/docs/current/api/"/>
<property name="javadoc.packages" value="org.${org}.${name}.*"/>
<property name="javac.encoding" value="ISO-8859-1"/>
@@ -456,6 +458,7 @@
<link href="${javadoc.link.java}"/>
<link href="${javadoc.link.jackson}"/>
<link href="${javadoc.link.servlet}"/>
+ <link href="${javadoc.link.hadoop}"/>
<classpath >
<path refid="java.classpath" />
Modified: hadoop/avro/trunk/lang/java/ivy.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/ivy.xml?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/ivy.xml (original)
+++ hadoop/avro/trunk/lang/java/ivy.xml Thu Apr 1 18:30:36 2010
@@ -61,6 +61,10 @@
conf="build->default"/>
<dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
conf="build->default;test->default;tools->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"
+ conf="build->default" transitive="false"/>
+ <dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
+ conf="test->default"/>
</dependencies>
</ivy-module>
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+/** An {...@link org.apache.hadoop.mapred.InputFormat} for Avro data files */
+public class AvroInputFormat<T>
+ extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job))
+ if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+ result.add(file);
+ return result.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public RecordReader<AvroWrapper<T>, NullWritable>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(split.toString());
+ return new AvroRecordReader<T>(job, (FileSplit)split);
+ }
+
+}
+
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.avro.Schema;
+
+/** Setters to configure jobs for Avro data. */
+public class AvroJob {
+ private AvroJob() {} // no public ctor
+
+ static final String API_GENERIC = "generic";
+ static final String API_SPECIFIC = "specific";
+
+ static final String INPUT_API = "avro.input.api";
+ static final String INPUT_SCHEMA = "avro.input.schema";
+
+ static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+ static final String MAP_OUTPUT_API = "avro.map.output.api";
+
+ static final String OUTPUT_SCHEMA = "avro.output.schema";
+ static final String OUTPUT_API = "avro.output.api";
+
+ /** Configure a job's map input to use Avro's generic API. */
+ public static void setInputGeneric(JobConf job, Schema s) {
+ job.set(INPUT_API, API_GENERIC);
+ configureAvroInput(job, s);
+ }
+
+ /** Configure a job's map input to use Avro's specific API. */
+ public static void setInputSpecific(JobConf job, Schema s) {
+ job.set(INPUT_API, API_SPECIFIC);
+ configureAvroInput(job, s);
+ }
+
+ private static void configureAvroInput(JobConf job, Schema s) {
+ job.set(INPUT_SCHEMA, s.toString());
+ job.setInputFormat(AvroInputFormat.class);
+ }
+
+ /** Configure a job's map output key schema using Avro's generic API. */
+ public static void setMapOutputGeneric(JobConf job, Schema s) {
+ job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ job.set(MAP_OUTPUT_API, API_GENERIC);
+ configureAvroOutput(job);
+ }
+
+ /** Configure a job's map output key schema using Avro's specific API. */
+ public static void setMapOutputSpecific(JobConf job, Schema s) {
+ job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ job.set(MAP_OUTPUT_API, API_SPECIFIC);
+ configureAvroOutput(job);
+ }
+
+ /** Configure a job's output key schema using Avro's generic API. */
+ public static void setOutputGeneric(JobConf job, Schema s) {
+ job.set(OUTPUT_SCHEMA, s.toString());
+ job.set(OUTPUT_API, API_GENERIC);
+ configureAvroOutput(job);
+ }
+
+ /** Configure a job's output key schema using Avro's specific API. */
+ public static void setOutputSpecific(JobConf job, Schema s) {
+ job.set(OUTPUT_SCHEMA, s.toString());
+ job.set(OUTPUT_API, API_SPECIFIC);
+ configureAvroOutput(job);
+ }
+
+ private static void configureAvroOutput(JobConf job) {
+ job.setOutputKeyClass(AvroWrapper.class);
+ job.setOutputKeyComparatorClass(AvroKeyComparator.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormat(AvroOutputFormat.class);
+
+ // add AvroKeySerialization to io.serializations
+ Collection<String> serializations =
+ job.getStringCollection("io.serializations");
+ if (!serializations.contains(AvroKeySerialization.class.getName())) {
+ serializations.add(AvroKeySerialization.class.getName());
+ job.setStrings("io.serializations",
+ serializations.toArray(new String[0]));
+ }
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+/** The {...@link RawComparator} used by jobs configured with {...@link
AvroJob}. */
+public class AvroKeyComparator<T>
+ extends Configured implements RawComparator<AvroWrapper<T>> {
+
+ private Schema schema;
+ private GenericData model;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ schema = Schema.parse(conf.get(AvroJob.MAP_OUTPUT_SCHEMA,
+ conf.get(AvroJob.OUTPUT_SCHEMA)));
+ String api = getConf().get(AvroJob.MAP_OUTPUT_API,
+ getConf().get(AvroJob.OUTPUT_API));
+ model = AvroJob.API_SPECIFIC.equals(api)
+ ? SpecificData.get()
+ : GenericData.get();
+ }
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int diff = BinaryData.compare(b1, s1, b2, s2, schema);
+ return diff == 0 ? -1 : diff;
+ }
+
+ public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+ int diff = model.compare(x.datum(), y.datum(), schema);
+ return diff == 0 ? -1 : diff;
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+/** The {...@link Serialization} used by jobs configured with {...@link
AvroJob}. */
+public class AvroKeySerialization<T> extends Configured
+ implements Serialization<AvroWrapper<T>> {
+
+ public boolean accept(Class<?> c) {
+ return AvroWrapper.class.isAssignableFrom(c);
+ }
+
+ /** Returns the specified map output deserializer. Defaults to the final
+ * output deserializer if no map output schema was specified. */
+ public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c)
{
+ // We need not rely on mapred.task.is.map here to determine whether map
+ // output or final output is desired, since the mapreduce framework never
+ // creates a deserializer for final output, only for map output.
+ String json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA,
+ getConf().get(AvroJob.OUTPUT_SCHEMA));
+ Schema schema = Schema.parse(json);
+
+ String api = getConf().get(AvroJob.MAP_OUTPUT_API,
+ getConf().get(AvroJob.OUTPUT_API));
+ DatumReader<T> reader = AvroJob.API_SPECIFIC.equals(api)
+ ? new SpecificDatumReader<T>(schema)
+ : new GenericDatumReader<T>(schema);
+
+ return new AvroWrapperDeserializer(reader);
+ }
+
+ private static final DecoderFactory FACTORY = new DecoderFactory();
+ static { FACTORY.configureDirectDecoder(true); }
+
+ private class AvroWrapperDeserializer
+ implements Deserializer<AvroWrapper<T>> {
+
+ private DatumReader<T> reader;
+ private BinaryDecoder decoder;
+
+ public AvroWrapperDeserializer(DatumReader<T> reader) {
+ this.reader = reader;
+ }
+
+ public void open(InputStream in) {
+ this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+ }
+
+ public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
+ throws IOException {
+ T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+ if (wrapper == null) {
+ wrapper = new AvroWrapper<T>(datum);
+ } else {
+ wrapper.datum(datum);
+ }
+ return wrapper;
+ }
+
+ public void close() throws IOException {
+ decoder.inputStream().close();
+ }
+
+ }
+
+ /** Returns the specified output serializer. */
+ public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+ // Here we must rely on mapred.task.is.map to tell whether the map output
+ // or final output is needed.
+ boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
+
+ String json = getConf().get(AvroJob.OUTPUT_SCHEMA);
+ if (isMap)
+ json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA, json);
+ Schema schema = Schema.parse(json);
+
+ String api = getConf().get(AvroJob.OUTPUT_API);
+ if (isMap)
+ api = getConf().get(AvroJob.MAP_OUTPUT_API, json);
+
+ DatumWriter<T> writer = AvroJob.API_SPECIFIC.equals(api)
+ ? new SpecificDatumWriter<T>(schema)
+ : new GenericDatumWriter<T>(schema);
+ return new AvroWrapperSerializer(writer);
+ }
+
+ private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+
+ private DatumWriter<T> writer;
+ private OutputStream out;
+ private BinaryEncoder encoder;
+
+ public AvroWrapperSerializer(DatumWriter<T> writer) {
+ this.writer = writer;
+ }
+
+ public void open(OutputStream out) {
+ this.out = out;
+ this.encoder = new BinaryEncoder(out);
+ }
+
+ public void serialize(AvroWrapper<T> wrapper) throws IOException {
+ writer.write(wrapper.datum(), encoder);
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Mapper;
+
+/** A {...@link Mapper} for Avro data.
+ *
+ * <p>Applications should subclass this class and pass their subclass to
{...@link
+ * org.apache.hadoop.mapred.JobConf#setMapperClass(Class)}. Subclasses must
+ * override {...@link #map} and may call {...@link #collect} to generate
output.
+ */
+public abstract class AvroMapper<IN,OUT> extends MapReduceBase
+ implements Mapper<AvroWrapper<IN>, NullWritable,
+ AvroWrapper<OUT>, NullWritable> {
+
+ private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+ private Reporter reporter;
+
+ public void map(AvroWrapper<IN> wrapper, NullWritable value,
+ OutputCollector<AvroWrapper<OUT>, NullWritable> output,
+ Reporter reporter) throws IOException {
+ if (this.out == null) {
+ this.out = output;
+ this.reporter = reporter;
+ }
+ map(wrapper.datum());
+ }
+
+ /** Return the {...@link Reporter} to permit status updates. */
+ public Reporter getReporter() { return reporter; }
+
+ /** Called with each map input datum. */
+ public abstract void map(IN datum) throws IOException;
+
+ private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+
+ /** Call with each map output datum. */
+ public void collect(OUT datum) throws IOException {
+ outputWrapper.datum(datum);
+ out.collect(outputWrapper, NullWritable.get());
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.CodecFactory;
+
+/** An {...@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+public class AvroOutputFormat <T>
+ extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+ final static String EXT = ".avro";
+
+ private static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+ private static final int DEFAULT_DEFLATE_LEVEL = 1;
+
+ /** Enable output compression using the deflate codec and specify its
level.*/
+ public static void setDeflateLevel(JobConf job, int level) {
+ FileOutputFormat.setCompressOutput(job, true);
+ job.setInt(DEFLATE_LEVEL_KEY, level);
+ }
+
+ public RecordWriter<AvroWrapper<T>, NullWritable>
+ getRecordWriter(FileSystem ignore, JobConf job,
+ String name, Progressable prog)
+ throws IOException {
+
+ Schema schema = Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+
+ DatumWriter<T> datumWriter =
+ AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))
+ ? new SpecificDatumWriter<T>()
+ : new GenericDatumWriter<T>();
+
+ final DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
+
+ if (FileOutputFormat.getCompressOutput(job)) {
+ int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+ writer.setCodec(CodecFactory.deflateCodec(level));
+ }
+
+ Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
+ writer.create(schema, path.getFileSystem(job).create(path));
+
+ return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+ public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+ throws IOException {
+ writer.append(wrapper.datum());
+ }
+ public void close(Reporter reporter) throws IOException {
+ writer.close();
+ }
+ };
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+
+/** An {...@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+ implements RecordReader<AvroWrapper<T>, NullWritable> {
+
+ private FsInput in;
+ private DataFileReader<T> reader;
+ private long start;
+ private long end;
+
+ public AvroRecordReader(JobConf job, FileSplit split)
+ throws IOException {
+ this.in = new FsInput(split.getPath(), job);
+ DatumReader<T> datumReader =
+ AvroJob.API_SPECIFIC.equals(job.get(AvroJob.INPUT_API))
+ ? new SpecificDatumReader<T>()
+ : new GenericDatumReader<T>();
+
+ this.reader = new DataFileReader<T>(in, datumReader);
+
+ reader.sync(split.getStart()); // sync to start
+ this.start = in.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ public AvroWrapper<T> createKey() {
+ return new AvroWrapper<T>(null);
+ }
+
+ public NullWritable createValue() { return NullWritable.get(); }
+
+ public boolean next(AvroWrapper<T> wrapper, NullWritable ignore)
+ throws IOException {
+ if (!reader.hasNext() || reader.pastSync(end))
+ return false;
+ wrapper.datum(reader.next(wrapper.datum()));
+ return true;
+ }
+
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return in.tell();
+ }
+
+ public void close() throws IOException { reader.close(); }
+
+}
+
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Reducer;
+
+/** A {...@link Reducer} for Avro data.
+ *
+ * <p>Applications should subclass this class and pass their subclass to
{...@link
+ * org.apache.hadoop.mapred.JobConf#setReducerClass(Class)} and perhaps
{...@link
+ * org.apache.hadoop.mapred.JobConf#setCombinerClass(Class)} Subclasses must
+ * override {...@link #reduce} and may call {...@link #collect} to generate
output.
+ *
+ * <p>Note that reducers here are not passed an iterator of all matching
+ * values. Rather, the reducer is called with every value. If values are to
+ * be combined then the reducer must maintain state accordingly. The final
+ * value may be flushed by overriding {...@link #close} to call {...@link
#collect}.
+ */
+public abstract class AvroReducer<IN,OUT> extends MapReduceBase
+ implements Reducer<AvroWrapper<IN>, NullWritable,
+ AvroWrapper<OUT>, NullWritable> {
+
+ private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+ private Reporter reporter;
+
+ private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+
+ public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore,
+ OutputCollector<AvroWrapper<OUT>,NullWritable> output,
+ Reporter reporter) throws IOException {
+ if (this.out == null) {
+ this.out = output;
+ this.reporter = reporter;
+ }
+ reduce(wrapper.datum());
+ }
+
+ /** Return the {...@link Reporter} to permit status updates. */
+ public Reporter getReporter() { return reporter; }
+
+ /** Called with each reduce input datum for this partition, in order. */
+ public abstract void reduce(IN datum) throws IOException;
+
+ /** Call with each final output datum. */
+ public void collect(OUT datum) throws IOException {
+ outputWrapper.datum(datum);
+ out.collect(outputWrapper, NullWritable.get());
+ }
+}
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
(added)
+++
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of values for jobs configured with {...@link AvroJob} . */
+public class AvroWrapper<T> {
+ private T datum;
+
+ /** Wrap a value datum. */
+ public AvroWrapper(T datum) { this.datum = datum; }
+
+ /** Return the wrapped datum. */
+ public T datum() { return datum; }
+
+ /** Set the wrapped datum. */
+ public void datum(T datum) { this.datum = datum; }
+
+ public int hashCode() {
+ return (datum == null) ? 0 : datum.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AvroWrapper that = (AvroWrapper)obj;
+ if (this.datum == null) {
+ if (that.datum != null)
+ return false;
+ } else if (!datum.equals(that.datum))
+ return false;
+ return true;
+ }
+
+}
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+/** Adapt an {...@link FSDataInputStream} to {...@link SeekableInput}. */
+public class FsInput implements Closeable, SeekableInput {
+ private final FSDataInputStream stream;
+ private final long len;
+
+ /** Construct given a path and a configuration. */
+ public FsInput(Path path, Configuration conf) throws IOException {
+ this.stream = path.getFileSystem(conf).open(path);
+ this.len = path.getFileSystem(conf).getFileStatus(path).getLen();
+ }
+
+ public long length() {
+ return len;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ }
+
+ public long tell() throws IOException {
+ return stream.getPos();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,51 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Tools to permit using Avro data
+with <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs.
+
+<p>Avro data files do not contain key/value pairs as expected by
+ Hadoop's MapReduce API, but rather just a sequence of values. Thus
+ we provide here a layer on top of Hadoop's MapReduce API which
+ eliminates the key/value distinction.</p>
+
+<p>To use this for jobs whose input and output are Avro data files:
+ <ul>
+ <li>Subclass {...@link org.apache.avro.mapred.AvroMapper} and specify
+ this as your job's mapper.</li>
+ <li>Subclass {...@link org.apache.avro.mapred.AvroReducer} and specify
+ this as your job's reducer and perhaps combiner.</li>
+ <li>Depending on whether your mapper uses Avro's specific or
+ generic API for inputs, call one of {...@link
+ org.apache.avro.mapred.AvroJob#setInputSpecific} or {...@link
+ org.apache.avro.mapred.AvroJob#setInputGeneric} with your input schema.</li>
+ <li>Depending on whether your job uses Avro's specific or generic
+ API for outputs, call one of {...@link
+ org.apache.avro.mapred.AvroJob#setOutputSpecific} or {...@link
+ org.apache.avro.mapred.AvroJob#setOutputGeneric} with your output
+ schema.</li>
+ <li>Specify input files with {...@link
org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
+ <li>Specify an output directory with {...@link
+ org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
+ <li>Run your job with {...@link
org.apache.hadoop.mapred.JobClient#runJob}</li>
+ </ul>
+</p>
+</body>
+</html>
Added:
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
(added)
+++
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+
+public class TestWordCountGeneric extends TestCase {
+
+ private static GenericRecord newWordCount(String word, int count) {
+ GenericRecord value = new GenericData.Record(WordCount.SCHEMA$);
+ value.put("word", new Utf8(word));
+ value.put("count", count);
+ return value;
+ }
+
+ public static class MapImpl extends AvroMapper<Utf8, GenericRecord> {
+ public void map(Utf8 text) throws IOException {
+ StringTokenizer tokens = new StringTokenizer(text.toString());
+ while (tokens.hasMoreTokens())
+ collect(newWordCount(tokens.nextToken(), 1));
+ }
+ }
+
+ public static class ReduceImpl
+ extends AvroReducer<GenericRecord, GenericRecord> {
+
+ private GenericRecord previous;
+
+ public void reduce(GenericRecord current) throws IOException {
+ if (current.equals(previous)) {
+ previous.put("count", ((Integer)previous.get("count"))
+ + (Integer)current.get("count"));
+ } else {
+ if (previous != null)
+ collect(previous);
+ previous = newWordCount(current.get("word").toString(),
+ (Integer)current.get("count"));
+ }
+ }
+
+ public void close() throws IOException {
+ if (previous != null)
+ collect(previous);
+ }
+
+ }
+
+ public void testJob() throws Exception {
+ WordCountUtil.writeLinesFile();
+
+ JobConf job = new JobConf();
+ job.setJobName("wordcount");
+
+ AvroJob.setInputGeneric(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputGeneric(job, WordCount.SCHEMA$);
+
+ job.setMapperClass(MapImpl.class);
+ job.setCombinerClass(ReduceImpl.class);
+ job.setReducerClass(ReduceImpl.class);
+
+ String dir = System.getProperty("test.dir",".")+"/mapred";
+ FileInputFormat.setInputPaths(job, new Path(dir+"/in"));
+ FileOutputFormat.setOutputPath(job, new Path(dir+"/out"));
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
(added)
+++
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+public class TestWordCountSpecific extends TestCase {
+
+ private static WordCount newWordCount(String word, int count) {
+ WordCount value = new WordCount();
+ value.word = new Utf8(word);
+ value.count = count;
+ return value;
+ }
+
+ public static class MapImpl extends AvroMapper<Utf8, WordCount> {
+ public void map(Utf8 text) throws IOException {
+ StringTokenizer tokens = new StringTokenizer(text.toString());
+ while (tokens.hasMoreTokens())
+ collect(newWordCount(tokens.nextToken(), 1));
+ }
+ }
+
+ public static class ReduceImpl extends AvroReducer<WordCount, WordCount> {
+
+ private WordCount previous;
+
+ public void reduce(WordCount current) throws IOException {
+ if (current.equals(previous)) {
+ previous.count++;
+ } else {
+ if (previous != null)
+ collect(previous);
+ previous = newWordCount(current.word.toString(), current.count);
+ }
+ }
+
+ public void close() throws IOException {
+ if (previous != null)
+ collect(previous);
+ }
+
+ }
+
+ public void testJob() throws Exception {
+ WordCountUtil.writeLinesFile();
+
+ JobConf job = new JobConf();
+ job.setJobName("wordcount");
+
+ AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
+
+ job.setMapperClass(MapImpl.class);
+ job.setCombinerClass(ReduceImpl.class);
+ job.setReducerClass(ReduceImpl.class);
+
+ String dir = System.getProperty("test.dir",".")+"/mapred";
+ FileInputFormat.setInputPaths(job, new Path(dir+"/in"));
+ FileOutputFormat.setOutputPath(job, new Path(dir+"/out"));
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+
+ }
+
+}
Added:
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=930060&view=auto
==============================================================================
---
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
(added)
+++
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Thu Apr 1 18:30:36 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.util.StringTokenizer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+
+class WordCountUtil {
+
+ private static final File DIR
+ = new File(System.getProperty("test.dir", ".") + "/mapred");
+ private static final File LINES_FILE
+ = new File(new File(DIR, "in"), "lines.avro");
+ private static final File COUNTS_FILE
+ = new File(new File(DIR, "out"), "part-00000.avro");
+
+ private static final String[] LINES = new String[] {
+ "the quick brown fox jumps over the lazy dog",
+ "the cow jumps over the moon",
+ "the rain in spain falls mainly on the plains"
+ };
+
+ private static final Map<String,Integer> COUNTS =
+ new TreeMap<String,Integer>();
+ static {
+ for (String line : LINES) {
+ StringTokenizer tokens = new StringTokenizer(line);
+ while (tokens.hasMoreTokens()) {
+ String word = tokens.nextToken();
+ int count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0;
+ count++;
+ COUNTS.put(word, count);
+ }
+ }
+ }
+
+ public static void writeLinesFile() throws IOException {
+ FileUtil.fullyDelete(DIR);
+ DatumWriter<Utf8> writer = new GenericDatumWriter<Utf8>();
+ DataFileWriter<Utf8> out = new DataFileWriter<Utf8>(writer);
+ LINES_FILE.getParentFile().mkdirs();
+ out.create(Schema.create(Schema.Type.STRING), LINES_FILE);
+ for (String line : LINES)
+ out.append(new Utf8(line));
+ out.close();
+ }
+
+ public static void validateCountsFile() throws IOException {
+ DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();
+ InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
+ DataFileStream<WordCount> counts = new
DataFileStream<WordCount>(in,reader);
+ int numWords = 0;
+ for (WordCount wc : counts) {
+ assertEquals(wc.word.toString(),
+ (int)COUNTS.get(wc.word.toString()), wc.count);
+ numWords++;
+ }
+ in.close();
+ assertEquals(COUNTS.size(), numWords);
+ }
+
+}
Added: hadoop/avro/trunk/share/test/schemas/WordCount.avsc
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/share/test/schemas/WordCount.avsc?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/share/test/schemas/WordCount.avsc (added)
+++ hadoop/avro/trunk/share/test/schemas/WordCount.avsc Thu Apr 1 18:30:36 2010
@@ -0,0 +1,6 @@
+{"type":"record", "name":"org.apache.avro.mapred.WordCount",
+ "fields":[
+ {"name":"word", "type":"string"},
+ {"name":"count", "type":"int", "order":"ignore"}
+ ]
+}