CRUNCH-314: Separate shuffle and bundle AvroMode configuration. This adds an integration test, AvroModeIT, that catches the behavior described in CRUNCH-314. The solution is to separate the AvroMode#configure methods into configure, for sources and targets, and configureShuffle, for SafeAvroSerialization and AvroGroupedTableType.
AvroDeepCopier and Avros also used a configure method to set the reflect factory, which has been updated to the more specific configureFactory. This also changes the default AvroMode to REFLECT because it is the most general. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9d7b9e46 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9d7b9e46 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9d7b9e46 Branch: refs/heads/apache-crunch-0.8 Commit: 9d7b9e4615d8aaf60babebb898a9165c8119d284 Parents: f65176f Author: Ryan Blue <[email protected]> Authored: Fri Dec 20 18:12:48 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 3 17:38:42 2014 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/io/avro/AvroModeIT.java | 144 +++++++++++++++++++ crunch-core/src/it/resources/strings-100.avro | Bin 0 -> 451 bytes .../crunch/types/avro/AvroDeepCopier.java | 5 +- .../crunch/types/avro/AvroGroupedTableType.java | 2 +- .../org/apache/crunch/types/avro/AvroMode.java | 25 +++- .../org/apache/crunch/types/avro/Avros.java | 2 +- .../types/avro/SafeAvroSerialization.java | 4 +- pom.xml | 1 + 8 files changed, 171 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java new file mode 100644 index 0000000..ff66fd7 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.avro; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Random; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.crunch.Aggregator; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroMode; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvroModeIT implements Serializable { + + public static final Schema GENERIC_SCHEMA = new Schema.Parser().parse("{\n" + + " \"name\": \"mystring\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " { \"name\": \"text\", \"type\": \"string\" }\n" + + " ]\n" + + "}"); + + static final class FloatArray { + private final float[] values; + FloatArray() { + this(null); + } + FloatArray(float[] values) { + this.values = values; + } + float[] getValues() { + return values; + } + } + + public static AvroType<float[]> FLOAT_ARRAY = Avros.derived(float[].class, + new MapFn<FloatArray, float[]>() { + @Override + public float[] map(FloatArray input) { + return input.getValues(); + } + }, + new MapFn<float[], FloatArray>() { + @Override + public FloatArray map(float[] input) { + return new FloatArray(input); + } + }, Avros.reflects(FloatArray.class)); + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testGenericReflectConflict() throws IOException { + final Random rand = new Random(); + rand.setSeed(12345); + Configuration conf = new Configuration(); + Pipeline pipeline = new MRPipeline(AvroModeIT.class, conf); + Source<GenericData.Record> source = From.avroFile( + tmpDir.copyResourceFileName("strings-100.avro"), + Avros.generics(GENERIC_SCHEMA)); + PTable<Long, float[]> mapPhase = pipeline + .read(source) + .parallelDo(new DoFn<GenericData.Record, Pair<Long, float[]>>() { + @Override + public void process(GenericData.Record input, Emitter<Pair<Long, float[]>> emitter) { + emitter.emit(Pair.of( + Long.valueOf(input.get("text").toString().length()), + new float[] {rand.nextFloat(), rand.nextFloat()})); + } + }, Avros.tableOf(Avros.longs(), FLOAT_ARRAY)); + + PTable<Long, float[]> result = mapPhase + .groupByKey() + .combineValues(new Aggregator<float[]>() { + float[] accumulator = null; + + @Override + public Iterable<float[]> results() { + return ImmutableList.of(accumulator); + } + + @Override + public void initialize(Configuration conf) { + } + + @Override + public void reset() { + this.accumulator = null; + } + + @Override + public void update(float[] value) { + if (accumulator == null) { + accumulator = Arrays.copyOf(value, 2); + } else { + for (int i = 0; i < value.length; i += 1) { + accumulator[i] += value[i]; + } + } + } + }); + + pipeline.writeTextFile(result, tmpDir.getFileName("unused")); + Assert.assertTrue("Should succeed", pipeline.done().succeeded()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/resources/strings-100.avro ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/strings-100.avro b/crunch-core/src/it/resources/strings-100.avro new file mode 100755 index 0000000..c968b97 Binary files /dev/null and b/crunch-core/src/it/resources/strings-100.avro differ http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 9e4b0a1..21dae45 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -152,13 +152,14 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { @Override protected DatumReader<T> createDatumReader(Configuration conf) { - AvroMode.REFLECT.configure(conf); + AvroMode.REFLECT.configureFactory(conf); return AvroMode.REFLECT.getReader(getSchema()); } @Override protected DatumWriter<T> createDatumWriter(Configuration conf) { - return AvroMode.fromConfiguration(conf).getWriter(getSchema()); + AvroMode.REFLECT.setFromConfiguration(conf); + return AvroMode.REFLECT.getWriter(getSchema()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index 62e6db4..a97f917 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { options.configure(job); } - AvroMode.fromType(att).configure(conf); + AvroMode.fromType(att).configureShuffle(conf); Collection<String> serializations = job.getConfiguration().getStringCollection( "io.serializations"); http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java index 77eece1..e2646cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java @@ -40,9 +40,16 @@ public enum AvroMode implements ReaderWriterFactory { GENERIC ("crunch.genericfactory"); public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode"; + public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode"; public static AvroMode fromConfiguration(Configuration conf) { - AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC); + AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT); + mode.setFromConfiguration(conf); + return mode; + } + + public static AvroMode fromShuffleConfiguration(Configuration conf) { + AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT); mode.setFromConfiguration(conf); return mode; } @@ -137,11 +144,9 @@ public enum AvroMode implements ReaderWriterFactory { } } - public void configure(Configuration conf) { - conf.setEnum(AVRO_MODE_PROPERTY, this); - if (factory != null) { - conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class); - } + public void configureShuffle(Configuration conf) { + conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this); + configureFactory(conf); } public void configure(FormatBundle bundle) { @@ -151,8 +156,16 @@ public enum AvroMode implements ReaderWriterFactory { } } + public void configureFactory(Configuration conf) { + if (factory != null) { + conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class); + } + } + @SuppressWarnings("unchecked") void setFromConfiguration(Configuration conf) { + // although the shuffle and input/output use different properties for mode, + // this is shared - only one ReaderWriterFactory can be used. Class<?> factoryClass = conf.getClass(propName, this.getClass()); if (factoryClass != this.getClass()) { this.factory = (ReaderWriterFactory) http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 3d6b04f..2cf63e8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -116,7 +116,7 @@ public class Avros { @Deprecated public static void configureReflectDataFactory(Configuration conf) { AvroMode.REFLECT.override(REFLECT_DATA_FACTORY); - AvroMode.REFLECT.configure(conf); + AvroMode.REFLECT.configureFactory(conf); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java index f56991e..9205056 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java @@ -60,7 +60,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) { datumReader = AvroMode.REFLECT.getReader(schema); } else { - datumReader = AvroMode.fromConfiguration(conf).getReader(schema); + datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema); } return new AvroWrapperDeserializer(datumReader, isKey); } @@ -105,7 +105,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf))); - ReaderWriterFactory factory = AvroMode.fromConfiguration(conf); + ReaderWriterFactory factory = AvroMode.fromShuffleConfiguration(conf); DatumWriter<T> writer = factory.getWriter(schema); return new AvroWrapperSerializer(writer); } http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2694c50..98d7378 100644 --- a/pom.xml +++ b/pom.xml @@ -629,6 +629,7 @@ under the License. <exclude>.idea/**</exclude> <exclude>**/resources/*.txt</exclude> <exclude>**/resources/**/*.txt</exclude> + <exclude>**/resources/*.avro</exclude> <exclude>**/goal.txt</exclude> <exclude>**/target/generated-test-sources/**</exclude> <exclude>**/scripts/scrunch</exclude>
