Author: cutting
Date: Tue Jan 7 22:10:34 2014
New Revision: 1556378
URL: http://svn.apache.org/r1556378
Log:
AVRO-1418. Java: Add sync support to AvroMultipleOutputs. Contributed by
Deepak Kumar V.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
(with props)
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
(with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jan 7 22:10:34 2014
@@ -30,6 +30,9 @@ Trunk (not yet released)
AVRO-1414. C++: Add support for deflate-compressed data files.
(Daniel Russel via cutting)
+ AVRO-1418. Java: Add sync support to AvroMultipleOutputs.
+ (Deepak Kumar V via cutting)
+
OPTIMIZATIONS
AVRO-1348. Java: Improve UTF-8 to String conversion performance in
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
Tue Jan 7 22:10:34 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskA
*
* @param <T> The Java type of the Avro data to write.
*/
-public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>,
NullWritable> {
+public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>,
NullWritable> implements Syncable {
/** A writer for the Avro container file. */
private final DataFileWriter<T> mAvroFileWriter;
@@ -82,4 +82,10 @@ public class AvroKeyRecordWriter<T> exte
public void close(TaskAttemptContext context) throws IOException {
mAvroFileWriter.close();
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long sync() throws IOException {
+ return mAvroFileWriter.sync();
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
Tue Jan 7 22:10:34 2014
@@ -22,13 +22,13 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.avro.Schema;
-import org.apache.avro.hadoop.io.AvroKeyValue;
-import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.TaskA
* @param <K> The type of key to write.
* @param <V> The type of value to write.
*/
-public class AvroKeyValueRecordWriter<K, V> extends RecordWriter<K, V> {
+public class AvroKeyValueRecordWriter<K, V> extends RecordWriter<K, V>
implements Syncable {
/** A writer for the Avro container file. */
private final DataFileWriter<GenericRecord> mAvroFileWriter;
@@ -132,4 +132,10 @@ public class AvroKeyValueRecordWriter<K,
public void close(TaskAttemptContext context) throws IOException {
mAvroFileWriter.close();
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long sync() throws IOException {
+ return mAvroFileWriter.sync();
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
Tue Jan 7 22:10:34 2014
@@ -19,26 +19,26 @@ package org.apache.avro.mapreduce;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.avro.Schema;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* The AvroMultipleOutputs class simplifies writing Avro output data
@@ -439,7 +439,35 @@ public class AvroMultipleOutputs{
TaskAttemptContext taskContext =
createTaskAttemptContext(job.getConfiguration(), context.getTaskAttemptID());
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
-
+
+ /**
+ *
+ * Gets the record writer from job's output format. Job's output format
should
+ * be a FileOutputFormat.If the record writer implements Syncable then
returns
+ * the current position as a value that may be passed to
DataFileReader.seek(long)
+ * otherwise returns -1.
+ * Forces the end of the current block, emitting a synchronization marker.
+ *
+ * @param namedOutput the namedOutput
+ * @param baseOutputPath base-output path to write the record to. Note:
Framework will
+ * generate unique filename for the baseOutputPath
+ */
+ @SuppressWarnings("unchecked")
+ public long sync(String namedOutput, String baseOutputPath) throws
IOException, InterruptedException {
+ checkNamedOutputName(context, namedOutput, false);
+ checkBaseOutputPath(baseOutputPath);
+ if (!namedOutputs.contains(namedOutput)) {
+ throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
+ }
+ TaskAttemptContext taskContext = getContext(namedOutput);
+ RecordWriter recordWriter = getRecordWriter(taskContext, baseOutputPath);
+ long position = -1;
+ if (recordWriter instanceof Syncable) {
+ Syncable syncableWriter = (Syncable) recordWriter;
+ position = syncableWriter.sync();
+ }
+ return position;
+ }
// by being synchronized MultipleOutputTask can be use with a
// MultithreadedMapper.
@SuppressWarnings("unchecked")
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java?rev=1556378&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
(added)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
Tue Jan 7 22:10:34 2014
@@ -0,0 +1,32 @@
+package org.apache.avro.mapreduce;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.IOException;
+
+public interface Syncable {
+
+ /**
+ * Return the current position as a value that may be passed to
DataFileReader.seek(long).
+ * Forces the end of the current block, emitting a synchronization marker.
+ *
+ * @throws IOException - if an error occurred while attempting to sync.
+ */
+ long sync() throws IOException;
+}
Propchange:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
Tue Jan 7 22:10:34 2014
@@ -18,25 +18,34 @@
package org.apache.avro.mapreduce;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.easymock.EasyMock;
import org.junit.Test;
public class TestAvroKeyRecordWriter {
@@ -73,4 +82,43 @@ public class TestAvroKeyRecordWriter {
dataFileReader.close();
}
+
+ @Test
+ public void testSycnableWrite() throws IOException {
+ Schema writerSchema = Schema.create(Schema.Type.INT);
+ GenericData dataModel = new ReflectData();
+ CodecFactory compressionCodec = CodecFactory.nullCodec();
+ FileOutputStream outputStream = new FileOutputStream(new
File("target/temp.avro"));
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+ replay(context);
+
+ // Write an avro container file with two records: 1 and 2.
+ AvroKeyRecordWriter<Integer> recordWriter = new
AvroKeyRecordWriter<Integer>(
+ writerSchema, dataModel, compressionCodec, outputStream);
+ long positionOne = recordWriter.sync();
+ recordWriter.write(new AvroKey<Integer>(1), NullWritable.get());
+ long positionTwo = recordWriter.sync();
+ recordWriter.write(new AvroKey<Integer>(2), NullWritable.get());
+ recordWriter.close(context);
+
+ verify(context);
+
+ // Verify that the file was written as expected.
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ Path avroFile = new Path("target/temp.avro");
+ DataFileReader<GenericData.Record> dataFileReader = new
DataFileReader<GenericData.Record>(new FsInput(avroFile,
+ conf), new SpecificDatumReader<GenericData.Record>());
+
+ dataFileReader.seek(positionTwo);
+ assertTrue(dataFileReader.hasNext()); // Record 2.
+ assertEquals(2, dataFileReader.next());
+
+ dataFileReader.seek(positionOne);
+ assertTrue(dataFileReader.hasNext()); // Record 1.
+ assertEquals(1, dataFileReader.next());
+
+ dataFileReader.close();
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
Tue Jan 7 22:10:34 2014
@@ -18,25 +18,37 @@
package org.apache.avro.mapreduce;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.FsInput;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -156,4 +168,68 @@ public class TestAvroKeyValueRecordWrite
assertEquals("reflectionData", firstRecord.getKey().toString());
assertEquals(record.attribute, firstRecord.getValue().attribute);
}
+
+ @Test
+ public void testSyncableWriteRecords() throws IOException {
+ Job job = new Job();
+ AvroJob.setOutputValueSchema(job, TextStats.SCHEMA$);
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+ replay(context);
+
+ AvroDatumConverterFactory factory = new
AvroDatumConverterFactory(job.getConfiguration());
+ AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+ AvroValue<TextStats> avroValue = new AvroValue<TextStats>(null);
+ @SuppressWarnings("unchecked")
+ AvroDatumConverter<AvroValue<TextStats>, ?> valueConverter
+ = factory.create((Class<AvroValue<TextStats>>) avroValue.getClass());
+ CodecFactory compressionCodec = CodecFactory.nullCodec();
+ FileOutputStream outputStream = new FileOutputStream(new
File("target/temp.avro"));
+
+ // Write a marker followed by each record: <'apple', TextStats('apple')>
and <'banana', TextStats('banana')>.
+ AvroKeyValueRecordWriter<Text, AvroValue<TextStats>> writer
+ = new AvroKeyValueRecordWriter<Text,
AvroValue<TextStats>>(keyConverter, valueConverter,
+ new ReflectData(), compressionCodec, outputStream);
+ TextStats appleStats = new TextStats();
+ appleStats.name = "apple";
+ long pointOne = writer.sync();
+ writer.write(new Text("apple"), new AvroValue<TextStats>(appleStats));
+ TextStats bananaStats = new TextStats();
+ bananaStats.name = "banana";
+ long pointTwo = writer.sync();
+ writer.write(new Text("banana"), new AvroValue<TextStats>(bananaStats));
+ writer.close(context);
+
+ verify(context);
+
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ Path avroFile = new Path("target/temp.avro");
+ DataFileReader<GenericData.Record> avroFileReader = new
DataFileReader<GenericData.Record>(new FsInput(avroFile,
+ conf), new SpecificDatumReader<GenericData.Record>());
+
+
+ avroFileReader.seek(pointTwo);
+ // Verify that the second record was written;
+ assertTrue(avroFileReader.hasNext());
+ AvroKeyValue<CharSequence, TextStats> secondRecord
+ = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+ assertNotNull(secondRecord.get());
+ assertEquals("banana", secondRecord.getKey().toString());
+ assertEquals("banana", secondRecord.getValue().name.toString());
+
+
+ avroFileReader.seek(pointOne);
+ // Verify that the first record was written.
+ assertTrue(avroFileReader.hasNext());
+ AvroKeyValue<CharSequence, TextStats> firstRecord
+ = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+ assertNotNull(firstRecord.get());
+ assertEquals("apple", firstRecord.getKey().toString());
+ assertEquals("apple", firstRecord.getValue().name.toString());
+
+
+ // That's all, folks.
+ avroFileReader.close();
+ }
}
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java?rev=1556378&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
(added)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
Tue Jan 7 22:10:34 2014
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroMultipleOutputsSyncable {
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ public static final Schema STATS_SCHEMA =
+ Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+ + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"}]}");
+ public static final Schema STATS_SCHEMA_2 =
+ Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+ + "\"fields\":[{\"name\":\"count1\",\"type\":\"int\"},"
+ + "{\"name\":\"name1\",\"type\":\"string\"}]}");
+
+ private static class LineCountMapper extends Mapper<LongWritable, Text,
Text, IntWritable> {
+ private IntWritable mOne;
+
+ @Override
+ protected void setup(Context context) {
+ mOne = new IntWritable(1);
+ }
+
+ @Override
+ protected void map(LongWritable fileByteOffset, Text line, Context context)
+ throws IOException, InterruptedException {
+ context.write(line, mOne);
+ }
+ }
+
+ private static class StatCountMapper
+ extends Mapper<AvroKey<TextStats>, NullWritable, Text, IntWritable> {
+ private IntWritable mCount;
+ private Text mText;
+
+ @Override
+ protected void setup(Context context) {
+ mCount = new IntWritable(0);
+ mText = new Text("");
+ }
+
+ @Override
+ protected void map(AvroKey<TextStats> record, NullWritable ignore, Context
context)
+ throws IOException, InterruptedException {
+ mCount.set(record.datum().count);
+ mText.set(record.datum().name.toString());
+ context.write(mText, mCount);
+ }
+ }
+
+ private static class GenericStatsReducer
+ extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>,
NullWritable> {
+ private AvroKey<GenericData.Record> mStats;
+ private AvroMultipleOutputs amos;
+
+ @Override
+ protected void setup(Context context) {
+ mStats = new AvroKey<GenericData.Record>(null);
+ amos = new AvroMultipleOutputs(context);
+ }
+
+ @Override
+ protected void reduce(Text line, Iterable<IntWritable> counts, Context
context)
+ throws IOException, InterruptedException {
+ GenericData.Record record = new GenericData.Record(STATS_SCHEMA);
+ GenericData.Record record2 = new GenericData.Record(STATS_SCHEMA_2);
+ int sum = 0;
+ for (IntWritable count : counts) {
+ sum += count.get();
+ }
+ record.put("name", new Utf8(line.toString()));
+ record.put("count", new Integer(sum));
+ mStats.datum(record);
+ context.write(mStats, NullWritable.get());
+ amos.sync("myavro","myavro");
+ amos.write("myavro",mStats,NullWritable.get());
+ record2.put("name1", new Utf8(line.toString()));
+ record2.put("count1", new Integer(sum));
+ mStats.datum(record2);
+ amos.write(mStats, NullWritable.get(), STATS_SCHEMA_2, null,
"testnewwrite2");
+ amos.sync("myavro1","myavro1");
+ amos.write("myavro1",mStats);
+ amos.write(mStats, NullWritable.get(), STATS_SCHEMA, null,
"testnewwrite");
+ amos.write(mStats, NullWritable.get(), "testwritenonschema");
+ }
+
+ @Override
+ protected void cleanup(Context context) throws
IOException,InterruptedException
+ {
+ amos.close();
+ }
+ }
+
+ private static class SpecificStatsReducer
+ extends Reducer<Text, IntWritable, AvroKey<TextStats>, NullWritable> {
+ private AvroKey<TextStats> mStats;
+ private AvroMultipleOutputs amos;
+ @Override
+ protected void setup(Context context) {
+ mStats = new AvroKey<TextStats>(null);
+ amos = new AvroMultipleOutputs(context);
+ }
+
+ @Override
+ protected void reduce(Text line, Iterable<IntWritable> counts, Context
context)
+ throws IOException, InterruptedException {
+ TextStats record = new TextStats();
+ record.count = 0;
+ for (IntWritable count : counts) {
+ record.count += count.get();
+ }
+ record.name = line.toString();
+ mStats.datum(record);
+ context.write(mStats, NullWritable.get());
+ amos.sync("myavro3","myavro3");
+ amos.write("myavro3",mStats,NullWritable.get());
+ }
+ @Override
+ protected void cleanup(Context context) throws
IOException,InterruptedException
+ {
+ amos.close();
+ }
+ }
+
+ private static class SortMapper
+ extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+ @Override
+ protected void map(AvroKey<TextStats> key, NullWritable value, Context
context)
+ throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ private static class SortReducer
+ extends Reducer<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+ @Override
+ protected void reduce(AvroKey<TextStats> key, Iterable<NullWritable>
ignore, Context context)
+ throws IOException, InterruptedException {
+ context.write(key, NullWritable.get());
+ }
+ }
+
+ @Test
+ public void testAvroGenericOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+ .toURI().toString()));
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(LineCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(GenericStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, STATS_SCHEMA);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroKeyOutputFormat.class,STATS_SCHEMA,null);
+ AvroMultipleOutputs.addNamedOutput(job,"myavro1",
AvroKeyOutputFormat.class, STATS_SCHEMA_2);
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+ outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<GenericData.Record> reader = new
DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer)
record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro1-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name1")).toString(), (Integer)
record.get("count1"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles =
fileSystem.globStatus(outputPath.suffix("/testnewwrite-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer)
record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles =
fileSystem.globStatus(outputPath.suffix("/testnewwrite2-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name1")).toString(), (Integer)
record.get("count1"));
+ }
+ reader.close();
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles =
fileSystem.globStatus(outputPath.suffix("/testwritenonschema-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer)
record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+
+ }
+
+ @Test
+ public void testAvroSpecificOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+ .toURI().toString()));
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(LineCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+ job.setReducerClass(SpecificStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out-specific");
+ outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
+ public void testAvroInput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+
.getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+ job.setMapperClass(StatCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(SpecificStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() +
"/out-specific-input");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
+ public void testAvroMapOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+
.getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setMapperClass(SortMapper.class);
+ AvroJob.setMapOutputKeySchema(job, TextStats.SCHEMA$);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setReducerClass(SortReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() +
"/out-specific-input");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles =
fileSystem.globStatus(outputPath.suffix("/part-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader<TextStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+}
Propchange:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
------------------------------------------------------------------------------
svn:eol-style = native