CRUNCH-433 Add support for AvroKeyValue file Add support for reading AvroKeyValue files created using org.apache.avro.mapreduce.AvroJob. Also add explicit methods in the From class for reading an Avro key/value file as a PTable.
Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/92025c7e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/92025c7e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/92025c7e Branch: refs/heads/master Commit: 92025c7e33d69414a427fc62daa88137b2f902d5 Parents: 8434dca Author: Gabriel Reid <[email protected]> Authored: Sun Jul 6 21:07:37 2014 +0200 Committer: Josh Wills <[email protected]> Committed: Sun Jul 6 19:07:23 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroKeyValueIT.java | 230 +++++++++++++++++++ .../main/java/org/apache/crunch/io/From.java | 25 ++ .../crunch/io/avro/AvroTableFileSource.java | 49 ++++ .../crunch/types/avro/AvroGroupedTableType.java | 3 +- .../types/avro/AvroKeyValueTableType.java | 157 +++++++++++++ .../apache/crunch/types/avro/AvroTableType.java | 2 +- .../org/apache/crunch/types/avro/Avros.java | 30 +++ .../crunch/types/avro/BaseAvroTableType.java | 37 +++ .../crunch/types/avro/AvroDeepCopierTest.java | 22 +- .../apache/crunch/types/avro/AvroTypeTest.java | 21 +- .../crunch/types/avro/ReflectedPerson.java | 87 +++++++ 11 files changed, 624 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java new file mode 100644 index 0000000..d78841c --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.avro; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.avro.Schema; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyValueOutputFormat; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.test.Person; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.avro.ReflectedPerson; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; + +/** + * Tests for verifying behavior with Avro produced using the org.apache.avro.mapred.* + * and org.apache.avro.mapreduce.* APIs. + */ +public class AvroKeyValueIT extends CrunchTestSupport implements Serializable { + + @Test + public void testInputFromMapReduceKeyValueFile_Generic() throws InterruptedException, IOException, ClassNotFoundException { + + Path keyValuePath = produceMapReduceOutputFile(); + + Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration()); + PTable<Person, Integer> personTable = pipeline.read( + From.avroTableFile(keyValuePath, Avros.tableOf(Avros.specifics(Person.class), Avros.ints()))); + + org.apache.crunch.Pair<Person, Integer> firstEntry = Iterables.getFirst(personTable.materialize(), null); + + assertEquals("a", firstEntry.first().getName().toString()); + assertEquals(Integer.valueOf(1), firstEntry.second()); + + pipeline.done(); + + } + + @Test + public void testInputFromMapRedKeyValueFile_Specific() throws IOException { + Path keyValuePath = produceMapRedOutputFile(); + + Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration()); + PTable<Person, Integer> personTable = pipeline.read( + From.avroTableFile(keyValuePath, Avros.keyValueTableOf(Avros.specifics(Person.class), Avros.ints()))); + + org.apache.crunch.Pair<Person, Integer> firstEntry = Iterables.getFirst(personTable.materialize(), null); + + assertEquals("a", firstEntry.first().getName().toString()); + assertEquals(Integer.valueOf(1), firstEntry.second()); + + // Verify that deep copying on this PType works as well + PTableType<Person, Integer> tableType = Avros.keyValueTableOf(Avros.specifics(Person.class), Avros.ints()); + tableType.initialize(tempDir.getDefaultConfiguration()); + org.apache.crunch.Pair<Person, Integer> detachedPair = tableType.getDetachedValue(firstEntry); + assertEquals(firstEntry, detachedPair); + + pipeline.done(); + } + + @Test + public void testInputFromMapRedKeyValueFile_Reflect() throws IOException { + Path keyValuePath = produceMapRedOutputFile(); + + Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration()); + PTable<ReflectedPerson, Integer> personTable = pipeline.read( + From.avroTableFile(keyValuePath, Avros.keyValueTableOf(Avros.reflects(ReflectedPerson.class), Avros.ints()))); + + org.apache.crunch.Pair<ReflectedPerson, Integer> firstEntry = Iterables.getFirst(personTable.materialize(), null); + + assertEquals("a", firstEntry.first().getName().toString()); + assertEquals(Integer.valueOf(1), firstEntry.second()); + + // Verify that deep copying on this PType works as well + PTableType<ReflectedPerson, Integer> tableType = + Avros.keyValueTableOf(Avros.reflects(ReflectedPerson.class), Avros.ints()); + tableType.initialize(tempDir.getDefaultConfiguration()); + org.apache.crunch.Pair<ReflectedPerson, Integer> detachedPair = tableType.getDetachedValue(firstEntry); + assertEquals(firstEntry, detachedPair); + + pipeline.done(); + } + + /** + * Produces an Avro file using the org.apache.avro.mapred.* API. + */ + private Path produceMapRedOutputFile() throws IOException { + + JobConf conf = new JobConf(tempDir.getDefaultConfiguration(), AvroKeyValueIT.class); + + org.apache.avro.mapred.AvroJob.setOutputSchema( + conf, + Pair.getPairSchema(Person.SCHEMA$, Schema.create(Schema.Type.INT))); + + + conf.setMapperClass(MapRedPersonMapper.class); + conf.setNumReduceTasks(0); + + conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); + + + + Path outputPath = new Path(tempDir.getFileName("mapreduce_output")); + org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, tempDir.copyResourcePath("letters.txt")); + org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(conf, outputPath); + + RunningJob runningJob = JobClient.runJob(conf); + runningJob.waitForCompletion(); + + return outputPath; + + } + + /** + * Produces an Avro file using the org.apache.avro.mapreduce.* API. + */ + private Path produceMapReduceOutputFile() throws IOException, ClassNotFoundException, InterruptedException { + + + Job job = new Job(tempDir.getDefaultConfiguration()); + job.setJarByClass(AvroKeyValueIT.class); + job.setJobName("Color Count"); + + Path outputPath = new Path(tempDir.getFileName("mapreduce_output")); + + FileInputFormat.setInputPaths(job, tempDir.copyResourcePath("letters.txt")); + FileOutputFormat.setOutputPath(job, outputPath); + + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(MapReducePersonMapper.class); + job.setNumReduceTasks(0); + AvroJob.setOutputKeySchema(job, Person.SCHEMA$); + AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT)); + + job.setOutputFormatClass(AvroKeyValueOutputFormat.class); + + boolean success = job.waitForCompletion(true); + + if (!success) { + throw new RuntimeException("Job failed"); + } + + return outputPath; + } + + public static class MapReducePersonMapper extends + Mapper<LongWritable, Text, AvroKey<Person>, AvroValue<Integer>> { + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + Person person = Person.newBuilder() + .setName(value.toString()) + .setAge(value.toString().length()) + .setSiblingnames(ImmutableList.<CharSequence>of()) + .build(); + context.write( + new AvroKey<Person>(person), + new AvroValue<Integer>(1)); + + } + } + + public static class MapRedPersonMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, AvroWrapper<Pair<Person,Integer>>, NullWritable> { + @Override + public void map(LongWritable key, Text value, OutputCollector<AvroWrapper<Pair<Person,Integer>>, NullWritable> outputCollector, Reporter reporter) throws IOException { + Person person = Person.newBuilder() + .setName(value.toString()) + .setAge(value.toString().length()) + .setSiblingnames(ImmutableList.<CharSequence>of()) + .build(); + outputCollector.collect( + new AvroWrapper<Pair<Person, Integer>>(new Pair<Person, Integer>(person, 1)), + NullWritable.get()); + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf entries) { + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java index 3c892a6..8c0ee3f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/From.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; @@ -28,9 +29,11 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.FsInput; import org.apache.avro.specific.SpecificRecord; +import org.apache.crunch.Pair; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.io.avro.AvroFileSource; +import org.apache.crunch.io.avro.AvroTableFileSource; import org.apache.crunch.io.impl.FileTableSourceImpl; import org.apache.crunch.io.seq.SeqFileSource; import org.apache.crunch.io.seq.SeqFileTableSource; @@ -316,6 +319,28 @@ public class From { return avroFile(paths, Avros.generics(getSchemaFromPath(paths.get(0), conf))); } + /** + * Creates a {@code TableSource<K,V>} for reading an Avro key/value file at the given path. + * + * @param path The path to the data on the filesystem + * @param tableType Avro table type for deserializing the table data + * @return a new {@code TableSource<K,V>} instance for reading Avro key/value data + */ + public static <K, V> TableSource<K, V> avroTableFile(Path path, PTableType<K, V> tableType) { + return avroTableFile(ImmutableList.of(path), tableType); + } + + /** + * Creates a {@code TableSource<K,V>} for reading an Avro key/value file at the given paths. + * + * @param paths list of paths to be read by the returned source + * @param tableType Avro table type for deserializing the table data + * @return a new {@code TableSource<K,V>} instance for reading Avro key/value data + */ + public static <K, V> TableSource<K, V> avroTableFile(List<Path> paths, PTableType<K, V> tableType) { + return new AvroTableFileSource<K, V>(paths, (AvroType<Pair<K,V>>)tableType); + } + static Schema getSchemaFromPath(Path path, Configuration conf) { DataFileReader reader = null; try { http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java new file mode 100644 index 0000000..beee79c --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.avro; + +import java.util.List; + +import org.apache.crunch.Pair; +import org.apache.crunch.TableSource; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.fs.Path; + +/** + * A file source for reading a table of Avro keys and values. + * + * This file source can be used for reading and writing tables compatible with + * the {@code org.apache.avro.mapred.AvroJob} and {@code org.apache.avro.mapreduce.AvroJob} classes (in addition to + * tables created by Crunch). + * + * @see org.apache.crunch.types.avro.Avros#tableOf(org.apache.crunch.types.PType, org.apache.crunch.types.PType) + * @see org.apache.crunch.types.avro.Avros#keyValueTableOf(org.apache.crunch.types.PType, org.apache.crunch.types.PType) + */ +public class AvroTableFileSource<K, V> extends AvroFileSource<Pair<K, V>> implements TableSource<K,V> { + + public AvroTableFileSource(List<Path> paths, AvroType<Pair<K, V>> tableType) { + super(paths, tableType); + } + + @Override + public PTableType<K, V> getTableType() { + return (PTableType<K,V>)super.getType(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index 7178274..3df313f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -30,6 +30,7 @@ import org.apache.crunch.fn.PairMapFn; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PGroupedTableType; +import org.apache.crunch.types.PTableType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -43,7 +44,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { private final MapFn inputFn; private final MapFn outputFn; - public AvroGroupedTableType(AvroTableType<K, V> tableType) { + public AvroGroupedTableType(BaseAvroTableType<K, V> tableType) { super(tableType); AvroType keyType = (AvroType) tableType.getKeyType(); AvroType valueType = (AvroType) tableType.getValueType(); http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java new file mode 100644 index 0000000..5891322 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.types.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.hadoop.io.AvroKeyValue; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.lib.PTables; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleDeepCopier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * A {@code PTableType} that is compatible with Avro key/value files that are created or read using the + * {@code org.apache.avro.mapreduce.AvroJob} class. + */ +class AvroKeyValueTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType<K, V> { + + private static class PairToAvroKeyValueRecord extends MapFn<Pair, GenericRecord> { + private final MapFn keyMapFn; + private final MapFn valueMapFn; + private final String keySchemaJson; + private final String valueSchemaJson; + + private String keyValueSchemaJson; + private transient Schema keyValueSchema; + + public PairToAvroKeyValueRecord(AvroType keyType, AvroType valueType) { + this.keyMapFn = keyType.getOutputMapFn(); + this.keySchemaJson = keyType.getSchema().toString(); + this.valueMapFn = valueType.getOutputMapFn(); + this.valueSchemaJson = valueType.getSchema().toString(); + } + + @Override + public void configure(Configuration conf) { + keyMapFn.configure(conf); + valueMapFn.configure(conf); + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + keyMapFn.setContext(context); + valueMapFn.setContext(context); + } + + @Override + public void initialize() { + keyMapFn.initialize(); + valueMapFn.initialize(); + Schema.Parser parser = new Schema.Parser(); + keyValueSchemaJson = AvroKeyValue.getSchema(parser.parse(keySchemaJson), parser.parse(valueSchemaJson)).toString(); + } + + @Override + public GenericRecord map(Pair input) { + if (keyValueSchema == null) { + keyValueSchema = new Schema.Parser().parse(keyValueSchemaJson); + } + GenericRecord keyValueRecord = new GenericData.Record(keyValueSchema); + keyValueRecord.put(AvroKeyValue.KEY_FIELD, keyMapFn.map(input.first())); + keyValueRecord.put(AvroKeyValue.VALUE_FIELD, valueMapFn.map(input.second())); + return keyValueRecord; + } + } + + private static class AvroKeyValueRecordToPair extends MapFn<GenericRecord, Pair> { + + private final MapFn firstMapFn; + private final MapFn secondMapFn; + + public AvroKeyValueRecordToPair(MapFn firstMapFn, MapFn secondMapFn) { + this.firstMapFn = firstMapFn; + this.secondMapFn = secondMapFn; + } + + @Override + public void configure(Configuration conf) { + firstMapFn.configure(conf); + secondMapFn.configure(conf); + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + firstMapFn.setContext(context); + secondMapFn.setContext(context); + } + + @Override + public void initialize() { + firstMapFn.initialize(); + secondMapFn.initialize(); + } + + @Override + public Pair map(GenericRecord input) { + return Pair.of( + firstMapFn.map(input.get(AvroKeyValue.KEY_FIELD)), + secondMapFn.map(input.get(AvroKeyValue.VALUE_FIELD))); + } + } + + private final AvroType<K> keyType; + private final AvroType<V> valueType; + + public AvroKeyValueTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) { + super(pairClass, AvroKeyValue.getSchema(keyType.getSchema(), valueType.getSchema()), + new AvroKeyValueRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), + new PairToAvroKeyValueRecord(keyType, valueType), + new TupleDeepCopier(Pair.class, keyType, valueType), + null, keyType, valueType); + this.keyType = keyType; + this.valueType = valueType; + } + + @Override + public PType<K> getKeyType() { + return keyType; + } + + @Override + public PType<V> getValueType() { + return valueType; + } + + @Override + public PGroupedTableType<K, V> getGroupedTableType() { + return new AvroGroupedTableType<K, V>(this); + } + + @Override + public Pair<K, V> getDetachedValue(Pair<K, V> value) { + return PTables.getDetachedValue(this, value); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 8e9e069..00047cc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; * The implementation of the PTableType interface for Avro-based serialization. * */ -class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> { +class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType<K, V> { private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> { private final MapFn keyMapFn; http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 4b2c67b..6e11f1b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -838,6 +838,16 @@ public class Avros { return PTypes.jsonString(clazz, AvroTypeFamily.getInstance()); } + /** + * A table type with an Avro type as key and as value. + * <p/> + * The {code PTableType} returned by this method is also compatible with files containing Avro {@code Pair}s that + * are created using the {@code org.apache.avro.mapred.AvroJob} class. + * + * @param key the PType of the key in the table + * @param value the PType of the value in the table + * @return PTableType for reading and writing avro tables + */ public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) { if (key instanceof PTableType) { PTableType ptt = (PTableType) key; @@ -852,6 +862,26 @@ public class Avros { return new AvroTableType(avroKey, avroValue, Pair.class); } + /** + * A table type with an Avro type as key and value. The {@code PTableType} returned by this method is specifically + * for reading and writing files that are compatible with those created via the + * {@code org.apache.avro.mapreduce.AvroJob} class. For all other Avro table purposes, the + * {@link #tableOf(org.apache.crunch.types.PType, org.apache.crunch.types.PType)} method should be used. + * + * @param key the PType of the key in the table + * @param value the PType of the value in the table + * @return PTableType for reading and writing files compatible with those created via + * the {@code org.apache.avro.mapreduce.AvroJob} class + */ + public static final <K, V> AvroKeyValueTableType<K, V> keyValueTableOf(PType<K> key, PType<V> value) { + AvroType<K> avroKey = (AvroType<K>) key; + AvroType<V> avroValue = (AvroType<V>) value; + + return new AvroKeyValueTableType<K, V>(avroKey, avroValue, + // Casting this to class is an unfortunately little way to get the generics out of the way here + (Class)Pair.class); + } + private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); private static Schema allowNulls(Schema base) { http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java new file mode 100644 index 0000000..848b33a --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.types.avro; + +import org.apache.avro.Schema; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.types.DeepCopier; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +/** + * Base type for dealing with PTables with Avro keys and values. + */ +abstract class BaseAvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> { + + protected BaseAvroTableType(Class<Pair<K, V>> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, + DeepCopier<Pair<K, V>> deepCopier, AvroRecordType recordType, PType... ptypes) { + super(typeClass, schema, inputMapFn, outputMapFn, deepCopier, recordType, ptypes); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index 795e2b4..da8dd28 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import java.nio.ByteBuffer; -import java.util.List; import com.google.common.collect.Lists; import org.apache.avro.generic.GenericData.Record; @@ -70,27 +69,12 @@ public class AvroDeepCopierTest { assertNull(new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(null)); } - static class ReflectedPerson { - String name; - int age; - List<String> siblingnames; - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof ReflectedPerson)) { - return false; - } - ReflectedPerson that = (ReflectedPerson) other; - return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames); - } - } - @Test public void testDeepCopyReflect() { ReflectedPerson person = new ReflectedPerson(); - person.name = "John Doe"; - person.age = 42; - person.siblingnames = Lists.newArrayList(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<String>newArrayList()); AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>( ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema()); http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 481444f..8764275 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -200,29 +200,14 @@ public class AvroTypeTest { specificType.getDetachedValue(person); } - static class ReflectedPerson { - String name; - int age; - List<String> siblingnames; - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof ReflectedPerson)) { - return false; - } - ReflectedPerson that = (ReflectedPerson) other; - return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames); - } - } - @Test public void testGetDetachedValue_ReflectAvroType() { AvroType<ReflectedPerson> reflectType = Avros.reflects(ReflectedPerson.class); reflectType.initialize(new Configuration()); ReflectedPerson rp = new ReflectedPerson(); - rp.name = "josh"; - rp.age = 32; - rp.siblingnames = Lists.newArrayList(); + rp.setName("josh"); + rp.setAge(32); + rp.setSiblingnames(Lists.<String>newArrayList()); ReflectedPerson detached = reflectType.getDetachedValue(rp); assertEquals(rp, detached); assertNotSame(rp, detached); http://git-wip-us.apache.org/repos/asf/crunch/blob/92025c7e/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java new file mode 100644 index 0000000..c7c5f88 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.types.avro; + +import java.util.List; + +/** + * A test helper class that conforms to the Person Avro specific data class, to use the Person schema for testing + * with reflection-based reading and writing. + */ +public class ReflectedPerson { + + private String name; + private int age; + private List<String> siblingnames; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public List<String> getSiblingnames() { + return siblingnames; + } + + public void setSiblingnames(List<String> siblingnames) { + this.siblingnames = siblingnames; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ReflectedPerson that = (ReflectedPerson) o; + + if (age != that.age) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + if (siblingnames != null ? !siblingnames.equals(that.siblingnames) : that.siblingnames != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + age; + result = 31 * result + (siblingnames != null ? siblingnames.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "ReflectedPerson{" + + "name='" + name + '\'' + + ", age=" + age + + ", siblingnames=" + siblingnames + + '}'; + } +}
