Repository: crunch Updated Branches: refs/heads/master c182309f0 -> 7f54a0e22
CRUNCH-436: Support AvroMode-based serialization for Crunch-on-Spark Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7f54a0e2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7f54a0e2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7f54a0e2 Branch: refs/heads/master Commit: 7f54a0e22cebca659cb8164e52b5eb4713125041 Parents: c182309 Author: Josh Wills <[email protected]> Authored: Sat Jul 5 14:09:08 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Aug 24 08:49:48 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/types/avro/AvroMode.java | 16 ++++++++ .../java/org/apache/crunch/SparkPageRankIT.java | 5 ++- .../impl/spark/collect/PGroupedTableImpl.java | 14 ++++++- .../crunch/impl/spark/serde/AvroSerDe.java | 39 ++++++++++++-------- .../apache/crunch/impl/spark/serde/SerDe.java | 2 + 5 files changed, 56 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7f54a0e2/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java index 9653f25..0c38105 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java @@ -18,6 +18,7 @@ package org.apache.crunch.types.avro; +import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -34,6 +35,8 @@ import org.apache.crunch.io.FormatBundle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import java.util.Map; + /** * AvroMode is an immutable object used for configuring the reading and writing of Avro types. * The mode will not be used or honored unless it has been appropriately configured using one of the supported @@ -310,6 +313,19 @@ public class AvroMode implements ReaderWriterFactory { } /** + * Returns the entries that a {@code Configuration} instance needs to enable + * this AvroMode as a serializable map of key-value pairs. + */ + public Map<String, String> getModeProperties() { + Map<String, String> props = Maps.newHashMap(); + props.put(AVRO_MODE_PROPERTY, this.modeType.toString()); + if (factory != null) { + props.put(propName, factory.getClass().getCanonicalName()); + } + return props; + } + + /** * Populates the {@code conf} with mode specific settings. * @param conf the configuration to populate. * @deprecated use {@link #configure(org.apache.hadoop.conf.Configuration)} http://git-wip-us.apache.org/repos/asf/crunch/blob/7f54a0e2/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java index c76c62a..47b9300 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java @@ -26,6 +26,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.PTypes; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Before; import org.junit.Rule; @@ -76,9 +77,9 @@ public class SparkPageRankIT { } @Test - public void testAvroJSON() throws Exception { + public void testAvroReflects() throws Exception { PTypeFamily tf = AvroTypeFamily.getInstance(); - PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf); + PType<PageRankData> prType = Avros.reflects(PageRankData.class); String urlInput = tmpDir.copyResourceFileName("urls.txt"); run(pipeline, urlInput, prType, tf); pipeline.done(); http://git-wip-us.apache.org/repos/asf/crunch/blob/7f54a0e2/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java index 95811cf..4de50b8 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java @@ -39,14 +39,18 @@ import org.apache.crunch.impl.spark.serde.AvroSerDe; import org.apache.crunch.impl.spark.serde.SerDe; import org.apache.crunch.impl.spark.serde.WritableSerDe; import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.writable.WritableType; import org.apache.crunch.util.PartitionUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.storage.StorageLevel; import java.util.List; +import java.util.Map; public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements SparkCollection { @@ -70,6 +74,12 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S return rdd; } + private AvroSerDe getAvroSerde(PType ptype, Configuration conf) { + AvroType at = (AvroType) ptype; + Map<String, String> props = AvroMode.fromType(at).withFactoryFromConfiguration(conf).getModeProperties(); + return new AvroSerDe(at, props); + } + private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime, CombineFn<K, V> combineFn) { JavaPairRDD<K, V> parentRDD = (JavaPairRDD<K, V>) ((SparkCollection)getOnlyParent()).getJavaRDDLike(runtime); if (combineFn != null) { @@ -78,8 +88,8 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S SerDe keySerde, valueSerde; PTableType<K, V> parentType = ptype.getTableType(); if (parentType instanceof AvroType) { - keySerde = new AvroSerDe((AvroType) parentType.getKeyType()); - valueSerde = new AvroSerDe((AvroType) parentType.getValueType()); + keySerde = getAvroSerde(parentType.getKeyType(), runtime.getConfiguration()); + valueSerde = getAvroSerde(parentType.getValueType(), runtime.getConfiguration()); } else { keySerde = new WritableSerDe(((WritableType) parentType.getKeyType()).getSerializationClass()); valueSerde = new WritableSerDe(((WritableType) parentType.getValueType()).getSerializationClass()); http://git-wip-us.apache.org/repos/asf/crunch/blob/7f54a0e2/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java index e6e08a0..f82ba8e 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java @@ -26,53 +26,60 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Map; public class AvroSerDe<T> implements SerDe<T> { private AvroType<T> avroType; + private Map<String, String> modeProperties; + private transient AvroMode mode; private transient DatumWriter<T> writer; private transient DatumReader<T> reader; - public AvroSerDe(AvroType<T> avroType) { + public AvroSerDe(AvroType<T> avroType, Map<String, String> modeProperties) { this.avroType = avroType; + this.modeProperties = modeProperties; if (avroType.hasReflect() && avroType.hasSpecific()) { Avros.checkCombiningSpecificAndReflectionSchemas(); } } + private AvroMode getMode() { + if (mode == null) { + mode = AvroMode.fromType(avroType); + if (modeProperties != null && !modeProperties.isEmpty()) { + Configuration conf = new Configuration(); + for (Map.Entry<String, String> e : modeProperties.entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + mode = mode.withFactoryFromConfiguration(conf); + } + } + return mode; + } + private DatumWriter<T> getWriter() { if (writer == null) { - if (avroType.hasReflect()) { - writer = new ReflectDatumWriter<T>(avroType.getSchema()); - } else if (avroType.hasSpecific()) { - writer = new SpecificDatumWriter<T>(avroType.getSchema()); - } else { - writer = new GenericDatumWriter<T>(avroType.getSchema()); - } + writer = getMode().getWriter(avroType.getSchema()); } return writer; } private DatumReader<T> getReader() { if (reader == null) { - if (avroType.hasReflect()) { - reader = new ReflectDatumReader<T>(avroType.getSchema()); - } else if (avroType.hasSpecific()) { - reader = new SpecificDatumReader<T>(avroType.getSchema()); - } else { - reader = new GenericDatumReader<T>(avroType.getSchema()); - } + reader = getMode().getReader(avroType.getSchema()); } return reader; } http://git-wip-us.apache.org/repos/asf/crunch/blob/7f54a0e2/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java index d374a41..887f656 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java @@ -18,10 +18,12 @@ package org.apache.crunch.impl.spark.serde; import com.google.common.base.Function; +import org.apache.hadoop.conf.Configuration; import java.io.Serializable; public interface SerDe<T> extends Serializable { + byte[] toBytes(T obj) throws Exception; T fromBytes(byte[] bytes);
