Author: cutting
Date: Tue Jun 15 21:50:28 2010
New Revision: 955064
URL: http://svn.apache.org/viewvc?rev=955064&view=rev
Log:
AVRO-577. Java: add MapReduce InputFormat for plain-text files. Contributed by
Tom White.
Added:
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
Modified:
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Added:
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java?rev=955064&view=auto
==============================================================================
---
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
(added)
+++
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
Tue Jun 15 21:50:28 2010
@@ -0,0 +1,98 @@
+package org.apache.avro.mapred;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An {...@link org.apache.hadoop.mapred.InputFormat} for text files.
+ * Each line is a {...@link Utf8} key; values are null.
+ */
+public class AvroUtf8InputFormat
+ extends FileInputFormat<AvroWrapper<Utf8>, NullWritable>
+ implements JobConfigurable {
+
+ static class Utf8LineRecordReader implements
+ RecordReader<AvroWrapper<Utf8>, NullWritable> {
+
+ private LineRecordReader lineRecordReader;
+
+ private LongWritable currentKeyHolder = new LongWritable();
+ private Text currentValueHolder = new Text();
+
+ public Utf8LineRecordReader(Configuration job,
+ FileSplit split) throws IOException {
+ this.lineRecordReader = new LineRecordReader(job, split);
+ }
+
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ public long getPos() throws IOException {
+ return lineRecordReader.getPos();
+ }
+
+ public float getProgress() throws IOException {
+ return lineRecordReader.getProgress();
+ }
+
+ public boolean next(AvroWrapper<Utf8> key, NullWritable value)
+ throws IOException {
+ boolean success = lineRecordReader.next(currentKeyHolder,
+ currentValueHolder);
+ if (success) {
+ key.datum(new Utf8(currentValueHolder.getBytes())
+ .setLength(currentValueHolder.getLength()));
+ } else {
+ key.datum(null);
+ }
+ return success;
+ }
+
+ @Override
+ public AvroWrapper<Utf8> createKey() {
+ return new AvroWrapper<Utf8>(null);
+ }
+
+ @Override
+ public NullWritable createValue() {
+ return NullWritable.get();
+ }
+
+ }
+
+ private CompressionCodecFactory compressionCodecs = null;
+
+ public void configure(JobConf conf) {
+ compressionCodecs = new CompressionCodecFactory(conf);
+ }
+
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ return compressionCodecs.getCodec(file) == null;
+ }
+
+ @Override
+ public RecordReader<AvroWrapper<Utf8>, NullWritable>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+
+ reporter.setStatus(split.toString());
+ return new Utf8LineRecordReader(job, (FileSplit) split);
+ }
+
+}
Modified:
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java?rev=955064&r1=955063&r2=955064&view=diff
==============================================================================
---
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
(original)
+++
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
Tue Jun 15 21:50:28 2010
@@ -103,5 +103,35 @@ public class TestWordCountGeneric {
outputPath.getFileSystem(job).delete(outputPath);
}
}
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testAvroUtf8InputFormat() throws Exception {
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+ JobConf job = new JobConf();
+ try {
+ WordCountUtil.writeLinesTextFile();
+
+ job.setJobName("wordcount");
+
+ job.setInputFormat(AvroUtf8InputFormat.class);
+ AvroJob.setOutputGeneric(job, WordCount.SCHEMA$);
+
+ job.setMapperClass(MapImpl.class);
+ job.setCombinerClass(ReduceImpl.class);
+ job.setReducerClass(ReduceImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+ } finally {
+ outputPath.getFileSystem(job).delete(outputPath);
+ }
+ }
}
Modified:
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=955064&r1=955063&r2=955064&view=diff
==============================================================================
---
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
(original)
+++
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Tue Jun 15 21:50:28 2010
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
+import java.io.PrintStream;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.TreeMap;
@@ -46,6 +47,8 @@ class WordCountUtil {
= new File(System.getProperty("test.dir", ".") + "/mapred");
private static final File LINES_FILE
= new File(new File(DIR, "in"), "lines.avro");
+ private static final File LINES_TEXT_FILE
+ = new File(new File(DIR, "in"), "lines.txt");
private static final File COUNTS_FILE
= new File(new File(DIR, "out"), "part-00000.avro");
@@ -79,6 +82,15 @@ class WordCountUtil {
out.append(new Utf8(line));
out.close();
}
+
+ public static void writeLinesTextFile() throws IOException {
+ FileUtil.fullyDelete(DIR);
+ LINES_FILE.getParentFile().mkdirs();
+ PrintStream out = new PrintStream(LINES_TEXT_FILE);
+ for (String line : LINES)
+ out.println(line);
+ out.close();
+ }
public static void validateCountsFile() throws IOException {
DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();