Updated Branches: refs/heads/master 316ccb6bf -> 58eb227d7
CRUNCH-314: Separate shuffle and bundle AvroMode configuration. This adds an integration test, AvroModeIT, that catches the behavior described in CRUNCH-314. The solution is to separate the AvroMode#configure methods into configure, for sources and targets, and configureShuffle, for SafeAvroSerialization and AvroGroupedTableType. AvroDeepCopier and Avros also used a configure method to set the reflect factory, which has been updated to the more specific configureFactory. This also changes the default AvroMode to REFLECT because it is the most general. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/58eb227d Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/58eb227d Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/58eb227d Branch: refs/heads/master Commit: 58eb227d78e6b1a32a334f83913df275ea1c7811 Parents: 316ccb6 Author: Ryan Blue <[email protected]> Authored: Fri Dec 20 18:12:48 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Sat Dec 21 11:31:07 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/io/avro/AvroModeIT.java | 144 +++++++++++++++++++ crunch-core/src/it/resources/strings-100.avro | Bin 0 -> 451 bytes .../crunch/types/avro/AvroDeepCopier.java | 5 +- .../crunch/types/avro/AvroGroupedTableType.java | 2 +- .../org/apache/crunch/types/avro/AvroMode.java | 25 +++- .../org/apache/crunch/types/avro/Avros.java | 2 +- .../types/avro/SafeAvroSerialization.java | 4 +- pom.xml | 1 + 8 files changed, 171 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java new file mode 100644 index 0000000..ff66fd7 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.avro; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Random; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.crunch.Aggregator; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroMode; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvroModeIT implements Serializable { + + public static final Schema GENERIC_SCHEMA = new Schema.Parser().parse("{\n" + + " \"name\": \"mystring\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " { \"name\": \"text\", \"type\": \"string\" }\n" + + " ]\n" + + "}"); + + static final class FloatArray { + private final float[] values; + FloatArray() { + this(null); + } + FloatArray(float[] values) { + this.values = values; + } + float[] getValues() { + return values; + } + } + + public static AvroType<float[]> FLOAT_ARRAY = Avros.derived(float[].class, + new MapFn<FloatArray, float[]>() { + @Override + public float[] map(FloatArray input) { + return input.getValues(); + } + }, + new MapFn<float[], FloatArray>() { + @Override + public FloatArray map(float[] input) { + return new FloatArray(input); + } + }, Avros.reflects(FloatArray.class)); + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testGenericReflectConflict() throws IOException { + final Random rand = new Random(); + rand.setSeed(12345); + Configuration conf = new Configuration(); + Pipeline pipeline = new MRPipeline(AvroModeIT.class, conf); + Source<GenericData.Record> source = From.avroFile( + tmpDir.copyResourceFileName("strings-100.avro"), + Avros.generics(GENERIC_SCHEMA)); + PTable<Long, float[]> mapPhase = pipeline + .read(source) + .parallelDo(new DoFn<GenericData.Record, Pair<Long, float[]>>() { + @Override + public void process(GenericData.Record input, Emitter<Pair<Long, float[]>> emitter) { + emitter.emit(Pair.of( + Long.valueOf(input.get("text").toString().length()), + new float[] {rand.nextFloat(), rand.nextFloat()})); + } + }, Avros.tableOf(Avros.longs(), FLOAT_ARRAY)); + + PTable<Long, float[]> result = mapPhase + .groupByKey() + .combineValues(new Aggregator<float[]>() { + float[] accumulator = null; + + @Override + public Iterable<float[]> results() { + return ImmutableList.of(accumulator); + } + + @Override + public void initialize(Configuration conf) { + } + + @Override + public void reset() { + this.accumulator = null; + } + + @Override + public void update(float[] value) { + if (accumulator == null) { + accumulator = Arrays.copyOf(value, 2); + } else { + for (int i = 0; i < value.length; i += 1) { + accumulator[i] += value[i]; + } + } + } + }); + + pipeline.writeTextFile(result, tmpDir.getFileName("unused")); + Assert.assertTrue("Should succeed", pipeline.done().succeeded()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/it/resources/strings-100.avro ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/strings-100.avro b/crunch-core/src/it/resources/strings-100.avro new file mode 100755 index 0000000..c968b97 Binary files /dev/null and b/crunch-core/src/it/resources/strings-100.avro differ http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 9e4b0a1..21dae45 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -152,13 +152,14 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { @Override protected DatumReader<T> createDatumReader(Configuration conf) { - AvroMode.REFLECT.configure(conf); + AvroMode.REFLECT.configureFactory(conf); return AvroMode.REFLECT.getReader(getSchema()); } @Override protected DatumWriter<T> createDatumWriter(Configuration conf) { - return AvroMode.fromConfiguration(conf).getWriter(getSchema()); + AvroMode.REFLECT.setFromConfiguration(conf); + return AvroMode.REFLECT.getWriter(getSchema()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index 62e6db4..a97f917 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { options.configure(job); } - AvroMode.fromType(att).configure(conf); + AvroMode.fromType(att).configureShuffle(conf); Collection<String> serializations = job.getConfiguration().getStringCollection( "io.serializations"); http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java index 77eece1..e2646cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java @@ -40,9 +40,16 @@ public enum AvroMode implements ReaderWriterFactory { GENERIC ("crunch.genericfactory"); public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode"; + public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode"; public static AvroMode fromConfiguration(Configuration conf) { - AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC); + AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT); + mode.setFromConfiguration(conf); + return mode; + } + + public static AvroMode fromShuffleConfiguration(Configuration conf) { + AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT); mode.setFromConfiguration(conf); return mode; } @@ -137,11 +144,9 @@ public enum AvroMode implements ReaderWriterFactory { } } - public void configure(Configuration conf) { - conf.setEnum(AVRO_MODE_PROPERTY, this); - if (factory != null) { - conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class); - } + public void configureShuffle(Configuration conf) { + conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this); + configureFactory(conf); } public void configure(FormatBundle bundle) { @@ -151,8 +156,16 @@ public enum AvroMode implements ReaderWriterFactory { } } + public void configureFactory(Configuration conf) { + if (factory != null) { + conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class); + } + } + @SuppressWarnings("unchecked") void setFromConfiguration(Configuration conf) { + // although the shuffle and input/output use different properties for mode, + // this is shared - only one ReaderWriterFactory can be used. Class<?> factoryClass = conf.getClass(propName, this.getClass()); if (factoryClass != this.getClass()) { this.factory = (ReaderWriterFactory) http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 3d6b04f..2cf63e8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -116,7 +116,7 @@ public class Avros { @Deprecated public static void configureReflectDataFactory(Configuration conf) { AvroMode.REFLECT.override(REFLECT_DATA_FACTORY); - AvroMode.REFLECT.configure(conf); + AvroMode.REFLECT.configureFactory(conf); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java index 7e323f1..1535a61 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java @@ -60,7 +60,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) { datumReader = AvroMode.REFLECT.getReader(schema); } else { - datumReader = AvroMode.fromConfiguration(conf).getReader(schema); + datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema); } return new AvroWrapperDeserializer(datumReader, isKey); } @@ -105,7 +105,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf))); - ReaderWriterFactory factory = AvroMode.fromConfiguration(conf); + ReaderWriterFactory factory = AvroMode.fromShuffleConfiguration(conf); DatumWriter<T> writer = factory.getWriter(schema); return new AvroWrapperSerializer(writer); } http://git-wip-us.apache.org/repos/asf/crunch/blob/58eb227d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ad14ce4..cba17a7 100644 --- a/pom.xml +++ b/pom.xml @@ -603,6 +603,7 @@ under the License. <exclude>.idea/**</exclude> <exclude>**/resources/*.txt</exclude> <exclude>**/resources/**/*.txt</exclude> + <exclude>**/resources/*.avro</exclude> <exclude>**/goal.txt</exclude> <exclude>**/target/generated-test-sources/**</exclude> <exclude>**/scripts/scrunch</exclude>
