Author: cutting
Date: Tue Apr 26 16:33:43 2011
New Revision: 1096798
URL: http://svn.apache.org/viewvc?rev=1096798&view=rev
Log:
AVRO-808. Java: Add AvroAsTextInputFormat for use with streaming. Contributed
by Tom White.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1096798&r1=1096797&r2=1096798&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Apr 26 16:33:43 2011
@@ -20,6 +20,9 @@ Avro 1.5.1 (unreleased)
AVRO-788. Java: Add Snappy compression for data files, including
MapReduce API support. (cutting)
+ AVRO-808. Java: Add AvroAsTextInputFormat for use with streaming.
+ (Tom White via cutting)
+
IMPROVEMENTS
AVRO-785. Java: Squash a Velocity warning by upgrading to Velocity 1.7.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java?rev=1096798&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
(added)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files, which
+ * converts each datum to string form in the input key. The input value is
+ * always empty. The string representation is
+ * <a href="http://www.json.org/">JSON</a>.
+ * <p>
+ * This {@link org.apache.hadoop.mapred.InputFormat} is useful for applications
+ * that wish to process Avro data using tools like MapReduce Streaming.
+ */
+public class AvroAsTextInputFormat extends FileInputFormat<Text, Text> {
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job))
+ if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+ result.add(file);
+ return result.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public RecordReader<Text, Text>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(split.toString());
+ return new AvroAsTextRecordReader(job, (FileSplit) split);
+ }
+}
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java?rev=1096798&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
(added)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+class AvroAsTextRecordReader<T> implements RecordReader<Text, Text> {
+
+ private FileReader<T> reader;
+ private T datum;
+ private long start;
+ private long end;
+
+ public AvroAsTextRecordReader(JobConf job, FileSplit split)
+ throws IOException {
+ this(DataFileReader.openReader
+ (new FsInput(split.getPath(), job), new GenericDatumReader<T>()),
split);
+ }
+
+ protected AvroAsTextRecordReader(FileReader<T> reader, FileSplit split)
+ throws IOException {
+ this.reader = reader;
+ reader.sync(split.getStart()); // sync to start
+ this.start = reader.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ public boolean next(Text key, Text ignore) throws IOException {
+ if (!reader.hasNext() || reader.pastSync(end))
+ return false;
+ datum = reader.next(datum);
+ if (datum instanceof ByteBuffer) {
+ ByteBuffer b = (ByteBuffer) datum;
+ if (b.hasArray()) {
+ int offset = b.arrayOffset();
+ int start = b.position();
+ int length = b.remaining();
+ key.set(b.array(), offset + start, offset + start + length);
+ } else {
+ byte[] bytes = new byte[b.remaining()];
+ b.duplicate().get(bytes);
+ key.set(bytes);
+ }
+ } else {
+ key.set(datum.toString());
+ }
+ return true;
+ }
+
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return reader.tell();
+ }
+
+ public void close() throws IOException { reader.close(); }
+
+
+}
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java?rev=1096798&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
(added)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestAvroAsTextInputFormat {
+
+ @Test
+ /**
+ * Run the identity job on a "bytes" Avro file using AvroAsTextInputFormat
+ * and check the output is a sorted text file.
+ */
+ public void testSort() throws Exception {
+ JobConf job = new JobConf();
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+ WordCountUtil.writeLinesBytesFile();
+
+ job.setInputFormat(AvroAsTextInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateSortedFile();
+ }
+
+}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1096798&r1=1096797&r2=1096798&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Tue Apr 26 16:33:43 2011
@@ -20,13 +20,19 @@ package org.apache.avro.mapred;
import static org.junit.Assert.*;
+import java.io.BufferedReader;
+import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.TreeMap;
@@ -53,6 +59,8 @@ class WordCountUtil {
= new File(new File(DIR, "in"), "lines.txt");
private static final File COUNTS_FILE
= new File(new File(DIR, "out"), "part-00000.avro");
+ private static final File SORTED_FILE
+ = new File(new File(DIR, "out"), "part-00000");
public static final String[] LINES = new String[] {
"the quick brown fox jumps over the lazy dog",
@@ -84,6 +92,17 @@ class WordCountUtil {
out.append(new Utf8(line));
out.close();
}
+
+ public static void writeLinesBytesFile() throws IOException {
+ FileUtil.fullyDelete(DIR);
+ DatumWriter<ByteBuffer> writer = new GenericDatumWriter<ByteBuffer>();
+ DataFileWriter<ByteBuffer> out = new DataFileWriter<ByteBuffer>(writer);
+ LINES_FILE.getParentFile().mkdirs();
+ out.create(Schema.create(Schema.Type.BYTES), LINES_FILE);
+ for (String line : LINES)
+ out.append(ByteBuffer.wrap(line.getBytes("UTF-8")));
+ out.close();
+ }
public static void writeLinesTextFile() throws IOException {
FileUtil.fullyDelete(DIR);
@@ -110,6 +129,19 @@ class WordCountUtil {
in.close();
assertEquals(COUNTS.size(), numWords);
}
+
+ public static void validateSortedFile() throws Exception {
+ BufferedReader reader = new BufferedReader(new FileReader(SORTED_FILE));
+ List<String> sortedLines = new ArrayList<String>();
+ for (String line : LINES) {
+ sortedLines.add(line);
+ }
+ Collections.sort(sortedLines);
+ for (String expectedLine : sortedLines) {
+ assertEquals(expectedLine, reader.readLine().trim());
+ }
+ assertNull(reader.readLine());
+ }
// metadata tests
private static final String STRING_KEY = "string-key";