Repository: crunch Updated Branches: refs/heads/master fcb861edc -> 583ea6dab
CRUNCH-401: Enable overrides of AvroMode.REFLECT for use with Scrunch. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/583ea6da Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/583ea6da Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/583ea6da Branch: refs/heads/master Commit: 583ea6dabc457ec69f316b52548b1dc92f99dda1 Parents: fcb861e Author: Josh Wills <[email protected]> Authored: Fri May 23 07:05:38 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Jun 1 18:52:42 2014 -0700 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileReaderFactory.java | 26 ++++++----- .../lib/join/BloomFilterJoinStrategy.java | 8 ++-- .../org/apache/crunch/lib/join/JoinUtils.java | 5 ++- .../crunch/lib/sort/ReverseAvroComparator.java | 5 ++- .../crunch/types/avro/AvroDeepCopier.java | 3 +- .../crunch/types/avro/AvroGroupedTableType.java | 2 +- .../org/apache/crunch/types/avro/AvroMode.java | 12 ++--- .../crunch/types/avro/AvroOutputFormat.java | 1 + .../org/apache/crunch/types/avro/Avros.java | 46 +++++++++++++------- .../crunch/types/avro/ReflectDataFactory.java | 5 --- .../apache/crunch/scrunch/DeepCopyTest.scala | 2 +- .../crunch/scrunch/PageRankClassTest.scala | 8 ++-- .../crunch/scrunch/ScalaReflectDataFactory.java | 12 ++--- .../crunch/scrunch/ScalaSafeReflectData.java | 21 +++++++-- .../scrunch/ScalaSafeReflectDatumReader.java | 30 +++++++------ .../scrunch/ScalaSafeReflectDatumWriter.java | 7 +-- .../org/apache/crunch/scrunch/PCollection.scala | 12 +++-- .../apache/crunch/scrunch/PCollectionLike.scala | 6 +++ .../org/apache/crunch/scrunch/PTable.scala | 5 ++- .../org/apache/crunch/scrunch/PTypeFamily.scala | 6 ++- .../org/apache/crunch/scrunch/Pipeline.scala | 1 + .../apache/crunch/scrunch/PipelineLike.scala | 20 +++++++-- 22 files changed, 158 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index 5f53a36..5128fd6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -30,6 +30,7 @@ import org.apache.crunch.MapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.impl.AutoClosingIterator; +import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.fs.FileSystem; @@ -42,29 +43,32 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class); - private final DatumReader<T> recordReader; + private DatumReader<T> reader; + private final AvroType<?> atype; private final MapFn<T, T> mapFn; - public AvroFileReaderFactory(AvroType<T> atype) { - this(createDatumReader(atype), atype); + public AvroFileReaderFactory(Schema schema) { + this(null, Avros.generics(schema)); } - public AvroFileReaderFactory(DatumReader<T> reader, AvroType<T> atype) { - this.recordReader = reader != null ? reader : createDatumReader(atype); - this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); + public AvroFileReaderFactory(AvroType<?> atype) { + this(null, atype); } - public AvroFileReaderFactory(Schema schema) { - this.recordReader = Avros.newReader(schema); - this.mapFn = IdentityFn.<T>getInstance(); + public AvroFileReaderFactory(DatumReader<T> reader, AvroType<?> atype) { + this.reader = reader; + this.atype = atype; + this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); } - static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) { - return Avros.newReader(avroType); + static <T> DatumReader<T> createDatumReader(AvroType<T> atype) { + return Avros.newReader(atype); } @Override public Iterator<T> read(FileSystem fs, final Path path) { + AvroMode mode = AvroMode.fromType(atype).withFactoryFromConfiguration(fs.getConf()); + final DatumReader recordReader = reader == null ? mode.getReader(atype.getSchema()) : reader; this.mapFn.initialize(); try { FsInput fsi = new FsInput(path, fs.getConf()); http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java index 872f3e3..69fe27e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.commons.logging.Log; @@ -35,9 +36,9 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; -import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; @@ -304,11 +305,12 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { private AvroType<T> ptype; private BinaryEncoder encoder; - private ReflectDatumWriter datumWriter; + private DatumWriter datumWriter; AvroToBytesFn(AvroType<T> ptype, Configuration conf) { this.ptype = ptype; - datumWriter = Avros.getReflectDataFactory(conf).getWriter(ptype.getSchema()); + datumWriter = AvroMode.fromType(ptype).withFactoryFromConfiguration(conf) + .getWriter(ptype.getSchema()); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java index b4829be..02963a7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java @@ -26,6 +26,7 @@ import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.reflect.ReflectData; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.writable.TupleWritable; import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.hadoop.conf.Configuration; @@ -108,6 +109,7 @@ public class JoinUtils { public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> { private Schema schema; + private AvroMode mode; @Override public void setConf(Configuration conf) { @@ -116,12 +118,13 @@ public class JoinUtils { Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf); Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema); schema = keySchema.getFields().get(0).schema(); + mode = AvroMode.fromShuffleConfiguration(conf); } } @Override public int compare(AvroWrapper<T> x, AvroWrapper<T> y) { - return ReflectData.get().compare(x.datum(), y.datum(), schema); + return mode.getData().compare(x.datum(), y.datum(), schema); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java index c404492..b94ccc2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java @@ -21,6 +21,7 @@ import org.apache.avro.Schema; import org.apache.avro.io.BinaryData; import org.apache.avro.mapred.AvroKey; import org.apache.avro.reflect.ReflectData; +import org.apache.crunch.types.avro.AvroMode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.RawComparator; @@ -28,18 +29,20 @@ import org.apache.hadoop.io.RawComparator; public class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> { private Schema schema; + private AvroMode mode; @Override public void setConf(Configuration conf) { super.setConf(conf); if (conf != null) { schema = (new Schema.Parser()).parse(conf.get("crunch.schema")); + mode = AvroMode.fromShuffleConfiguration(conf); } } @Override public int compare(AvroKey<T> o1, AvroKey<T> o2) { - return ReflectData.get().compare(o2.datum(), o1.datum(), schema); + return mode.getData().compare(o2.datum(), o1.datum(), schema); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 56ec459..4a98228 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -153,8 +153,7 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { @Override protected DatumReader<T> createDatumReader(Configuration conf) { - AvroMode.REFLECT.configureFactory(conf); - return AvroMode.REFLECT.getReader(getSchema()); + return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema()); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index a97f917..7178274 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { options.configure(job); } - AvroMode.fromType(att).configureShuffle(conf); + AvroMode.fromType(att).withFactoryFromConfiguration(conf).configureShuffle(conf); Collection<String> serializations = job.getConfiguration().getStringCollection( "io.serializations"); http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java index 90ff791..3d7fbfa 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java @@ -51,12 +51,12 @@ public class AvroMode implements ReaderWriterFactory { /** * Default mode to use for reading and writing {@link ReflectData Reflect} types. */ - public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS); + public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, Avros.REFLECT_DATA_FACTORY_CLASS); /** * Default mode to use for reading and writing {@link SpecificData Specific} types. */ - public static final AvroMode SPECIFIC =new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory"); + public static final AvroMode SPECIFIC = new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory"); /** * Default mode to use for reading and writing {@link GenericData Generic} types. */ @@ -95,11 +95,11 @@ public class AvroMode implements ReaderWriterFactory { if (type.hasSpecific()) { Avros.checkCombiningSpecificAndReflectionSchemas(); } - return AvroMode.REFLECT; + return REFLECT; } else if (type.hasSpecific()) { - return AvroMode.SPECIFIC; + return SPECIFIC; } else { - return AvroMode.GENERIC; + return GENERIC; } } @@ -352,7 +352,7 @@ public class AvroMode implements ReaderWriterFactory { } @SuppressWarnings("unchecked") - AvroMode withFactoryFromConfiguration(Configuration conf) { + public AvroMode withFactoryFromConfiguration(Configuration conf) { // although the shuffle and input/output use different properties for mode, // this is shared - only one ReaderWriterFactory can be used. Class<?> factoryClass = conf.getClass(propName, this.getClass()); http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java index 5a4499d..6dbb6de 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroWrapper; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 1fcb30e..62945b1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -100,14 +100,11 @@ public class Avros { * * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory) */ - public static ReflectDataFactory REFLECT_DATA_FACTORY = - (ReflectDataFactory) AvroMode.REFLECT.getFactory(); + public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory(); /** * The name of the configuration parameter that tracks which reflection * factory to use. - * - * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory) */ public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory"; @@ -273,7 +270,11 @@ public class Avros { } public static final <T> AvroType<T> reflects(Class<T> clazz) { - Schema schema = REFLECT_DATA_FACTORY.getData().getSchema(clazz); + Schema schema = ((ReflectData) AvroMode.REFLECT.getData()).getSchema(clazz); + return reflects(clazz, schema); + } + + public static final <T> AvroType<T> reflects(Class<T> clazz, Schema schema) { return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema)); } @@ -541,6 +542,7 @@ public class Avros { private final String jsonSchema; private final boolean isReflect; private transient Schema schema; + private transient AvroMode mode; public TupleToGenericRecord(Schema schema, PType<?>... ptypes) { this.fns = Lists.newArrayList(); @@ -585,11 +587,16 @@ public class Avros { for (MapFn fn : fns) { fn.initialize(); } + if (getConfiguration() != null) { + mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration()); + } else { + mode = AvroMode.REFLECT; + } } private GenericRecord createRecord() { if (isReflect) { - return new ReflectGenericRecord(schema); + return new ReflectGenericRecord(schema, mode); } else { return new GenericData.Record(schema); } @@ -693,6 +700,7 @@ public class Avros { private final String jsonSchema; private final boolean isReflect; private transient Schema schema; + private transient AvroMode mode; public TupleToUnionRecord(Schema schema, PType<?>... ptypes) { this.fns = Lists.newArrayList(); @@ -737,11 +745,16 @@ public class Avros { for (MapFn fn : fns) { fn.initialize(); } + if (getConfiguration() != null) { + mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration()); + } else { + mode = AvroMode.REFLECT; + } } private GenericRecord createRecord() { if (isReflect) { - return new ReflectGenericRecord(schema); + return new ReflectGenericRecord(schema, mode); } else { return new GenericData.Record(schema); } @@ -850,20 +863,23 @@ public class Avros { private static class ReflectGenericRecord extends GenericData.Record { - public ReflectGenericRecord(Schema schema) { + private AvroMode mode; + + public ReflectGenericRecord(Schema schema, AvroMode mode) { super(schema); + this.mode = mode; } @Override public int hashCode() { - return reflectAwareHashCode(this, getSchema()); + return reflectAwareHashCode(this, getSchema(), mode); } } /* * TODO: Remove this once we no longer have to support 1.5.4. */ - private static int reflectAwareHashCode(Object o, Schema s) { + private static int reflectAwareHashCode(Object o, Schema s, AvroMode mode) { if (o == null) return 0; // incomplete datum int hashCode = 1; @@ -872,17 +888,17 @@ public class Avros { for (Schema.Field f : s.getFields()) { if (f.order() == Schema.Field.Order.IGNORE) continue; - hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema()); + hashCode = hashCodeAdd(hashCode, mode.getData().getField(o, f.name(), f.pos()), f.schema(), mode); } return hashCode; case ARRAY: Collection<?> a = (Collection<?>) o; Schema elementType = s.getElementType(); for (Object e : a) - hashCode = hashCodeAdd(hashCode, e, elementType); + hashCode = hashCodeAdd(hashCode, e, elementType, mode); return hashCode; case UNION: - return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o))); + return reflectAwareHashCode(o, s.getTypes().get(mode.getData().resolveUnion(s, o)), mode); case ENUM: return s.getEnumOrdinal(o.toString()); case NULL: @@ -895,8 +911,8 @@ public class Avros { } /** Add the hash code for an object into an accumulated hash code. */ - private static int hashCodeAdd(int hashCode, Object o, Schema s) { - return 31 * hashCode + reflectAwareHashCode(o, s); + private static int hashCodeAdd(int hashCode, Object o, Schema s, AvroMode mode) { + return 31 * hashCode + reflectAwareHashCode(o, s, mode); } private Avros() { http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java index 3a5d6f4..65c1f49 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java @@ -32,11 +32,6 @@ public class ReflectDataFactory implements ReaderWriterFactory { return ReflectData.AllowNull.get(); } - // for backwards-compatibility - public ReflectData getReflectData() { - return getData(); - } - @Override public <T> ReflectDatumReader<T> getReader(Schema schema) { return new ReflectDatumReader<T>(schema); http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala index c931754..816993b 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala @@ -73,7 +73,7 @@ class DeepCopyTest extends CrunchSuite { @SuppressWarnings(Array("rawtypes", "unchecked")) private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) { val r: AnyRef = records.iterator.next() - val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass) + val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass) val writer = new ScalaReflectDataFactory().getWriter[T](schema) val dataFileWriter = new DataFileWriter(writer) http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala index 3d3cb9f..f7ccf1a 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala @@ -27,12 +27,12 @@ import scala.collection.mutable.HashMap import _root_.org.junit.Assert._ import _root_.org.junit.Test -case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String]) { - def this() = this(0f, 0f, null) +case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String], bytes: Array[Byte]) { + def this() = this(0f, 0f, null, Array[Byte](0)) def scaledPageRank = page_rank / urls.length - def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls) + def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls, bytes) def delta = math.abs(page_rank - oldpr) } @@ -67,7 +67,7 @@ class PageRankClassTest extends CrunchSuite { pipeline.read(from.textFile(fileName, Avros.strings)) .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) .groupByKey - .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray))) + .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray, Array[Byte](0)))) } def update(prev: PTable[String, PageRankData], d: Float) = { http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java index e3d4eb2..7fd962b 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java @@ -22,22 +22,24 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.crunch.types.avro.ReaderWriterFactory; import org.apache.crunch.types.avro.ReflectDataFactory; /** * An implementation of the {@code ReflectDataFactory} class to work with Scala classes. */ -public class ScalaReflectDataFactory extends ReflectDataFactory { +public class ScalaReflectDataFactory implements ReaderWriterFactory { @Override - public ReflectData getReflectData() { return ScalaSafeReflectData.get(); } + public ReflectData getData() { return ScalaSafeReflectData.getInstance(); } @Override public <T> ReflectDatumReader<T> getReader(Schema schema) { return new ScalaSafeReflectDatumReader<T>(schema); } - - public <T> ReflectDatumWriter<T> getWriter() { - return new ScalaSafeReflectDatumWriter<T>(); + + @Override + public <T> ReflectDatumWriter<T> getWriter(Schema schema) { + return new ScalaSafeReflectDatumWriter<T>(schema); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java index 6118834..6885f3e 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java @@ -48,7 +48,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull { private static final ScalaSafeReflectData INSTANCE = new ScalaSafeReflectData(); - public static ScalaSafeReflectData get() { return INSTANCE; } + public static ScalaSafeReflectData getInstance() { return INSTANCE; } static final String CLASS_PROP = "java-class"; static final String ELEMENT_PROP = "java-element-class"; @@ -88,7 +88,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull { protected Schema createSchema(Type type, Map<String,Schema> names) { if (type instanceof GenericArrayType) { // generic array Type component = ((GenericArrayType)type).getGenericComponentType(); - if (component == Byte.TYPE) // byte array + if (component == Byte.TYPE) // byte array return Schema.create(Schema.Type.BYTES); Schema result = Schema.createArray(createSchema(component, names)); setElement(result, component); @@ -127,8 +127,11 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull { return super.createSchema(type, names); if (c.isArray()) { // array Class component = c.getComponentType(); - if (component == Byte.TYPE) // byte array - return Schema.create(Schema.Type.BYTES); + if (component == Byte.TYPE) { // byte array + Schema result = Schema.create(Schema.Type.BYTES); + result.addProp(CLASS_PROP, c.getName()); // For scala-specific byte arrays + return result; + } Schema result = Schema.createArray(createSchema(component, names)); setElement(result, component); return result; @@ -289,4 +292,14 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull { protected boolean isMap(Object datum) { return (datum instanceof java.util.Map) || (datum instanceof scala.collection.Map); } + + @Override + protected String getSchemaName(Object datum) { + if (datum != null) { + if(byte[].class.isAssignableFrom(datum.getClass())) { + return Schema.Type.BYTES.getName(); + } + } + return super.getSchemaName(datum); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java index 123b45e..bbe7305 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java @@ -26,6 +26,7 @@ import java.util.Map; import com.google.common.collect.Lists; import org.apache.avro.Schema; import org.apache.avro.io.ResolvingDecoder; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.hadoop.util.ReflectionUtils; @@ -34,7 +35,7 @@ import scala.collection.JavaConversions; public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> { public ScalaSafeReflectDatumReader(Schema schema) { - super(schema, schema, ScalaSafeReflectData.get()); + super(schema, schema, ScalaSafeReflectData.getInstance()); } @Override @@ -100,25 +101,28 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> { @Override @SuppressWarnings("unchecked") protected Object newArray(Object old, int size, Schema schema) { - ScalaSafeReflectData data = ScalaSafeReflectData.get(); - Class collectionClass = ScalaSafeReflectData.getClassProp(schema, - ScalaSafeReflectData.CLASS_PROP); - if (collectionClass != null) { + Class collectionClass = + ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP); + Class elementClass = + ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP); + + if (collectionClass == null && elementClass == null) + return super.newArray(old, size, schema); // use specific/generic + + ScalaSafeReflectData data = ScalaSafeReflectData.getInstance(); + if (collectionClass != null && !collectionClass.isArray()) { if (old instanceof Collection) { ((Collection)old).clear(); return old; } if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) || - collectionClass.isAssignableFrom(ArrayList.class)) { - return Lists.newArrayList(); - } - return ReflectionUtils.newInstance(collectionClass, null); + collectionClass.isAssignableFrom(ArrayList.class)) + return new ArrayList(); + return data.newInstance(collectionClass, schema); } - Class elementClass = ScalaSafeReflectData.getClassProp(schema, - ScalaSafeReflectData.ELEMENT_PROP); - if (elementClass == null) { + + if (elementClass == null) elementClass = data.getClass(schema.getElementType()); - } return Array.newInstance(elementClass, size); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java index a19d6fb..1ac768c 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java @@ -20,6 +20,7 @@ package org.apache.crunch.scrunch; import java.util.Iterator; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.reflect.ReflectDatumWriter; import scala.collection.JavaConversions; @@ -28,10 +29,10 @@ import scala.collection.JavaConversions; * */ public class ScalaSafeReflectDatumWriter<T> extends ReflectDatumWriter<T> { - public ScalaSafeReflectDatumWriter() { - super(ScalaSafeReflectData.get()); + public ScalaSafeReflectDatumWriter(Schema schema) { + super(schema, ScalaSafeReflectData.getInstance()); } - + @Override protected long getArraySize(Object array) { if (array instanceof scala.collection.Iterable) { http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index dc0ab0b..31c2f8a 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -69,7 +69,7 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol } def materialize() = { - InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) + setupRun() JavaConversions.iterableAsScalaIterable[S](native.materialize) } @@ -86,9 +86,15 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol count.mapValues(_.longValue()) } - def max()(implicit converter: Converter[S, S]) = PObject(Aggregate.max(native))(converter) + def max()(implicit converter: Converter[S, S]) = { + setupRun() + PObject(Aggregate.max(native))(converter) + } - def min()(implicit converter: Converter[S, S]) = PObject(Aggregate.min(native))(converter) + def min()(implicit converter: Converter[S, S]) = { + setupRun() + PObject(Aggregate.min(native))(converter) + } def sample(acceptanceProbability: Double) = { wrap(Sample.sample(native, acceptanceProbability)) http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index 1e2e890..35b90be 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -41,6 +41,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S, JPair[K, V]] protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]] + protected def setupRun() { + PipelineLike.setupConf(native.getPipeline().getConfiguration()) + } + /** * Returns the underlying PCollection wrapped by this instance. */ @@ -254,6 +258,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { * @return The number of elements in this PCollection. */ def length(): PObject[Long] = { + setupRun() PObject(native.length()) } @@ -262,6 +267,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { * @return */ def asSeq(): PObject[Seq[S]] = { + setupRun() PObject(native.asCollection()) } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 1d5a70e..aefad67 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -155,16 +155,17 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f)) def materialize(): Iterable[(K, V)] = { - InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) + setupRun() native.materialize.view.map(x => (x.first, x.second)) } def materializeToMap(): Map[K, V] = { - InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) + setupRun() native.materializeToMap().view.toMap } def asMap(): PObject[Map[K, V]] = { + setupRun() PObject(native.asMap()) } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 394e2ac..aadb026 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -178,7 +178,9 @@ object Avros extends PTypeFamily { CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) } - def reflects[T: ClassTag]() = { - CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]] + def reflects[T: ClassTag](): AvroType[T] = { + val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + val schema = ScalaSafeReflectData.getInstance().getSchema(clazz) + CAvros.reflects(clazz, schema) } } http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala index 67c9b14..a72dc7a 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala @@ -53,6 +53,7 @@ import org.apache.crunch.types.{PTableType, PType} * }}} */ class Pipeline(val jpipeline: JPipeline) extends PipelineLike { + /** * A convenience method for reading a text file. * http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index 27c43a7..b800612 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -26,6 +26,9 @@ import org.apache.crunch.types.{PTableType, PType} trait PipelineLike { def jpipeline: JPipeline + // Call this to ensure we set this up before any subsequent calls to the system + PipelineLike.setupConf(getConfiguration()) + /** * Gets the configuration object associated with this pipeline. */ @@ -110,12 +113,13 @@ trait PipelineLike { */ def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt)) + /** * Returns a handler for controlling the execution of the underlying MapReduce * pipeline. */ def runAsync(): PipelineExecution = { - InterpreterRunner.addReplJarsToJob(getConfiguration()) + PipelineLike.setupConf(getConfiguration()) jpipeline.runAsync() } @@ -124,7 +128,7 @@ trait PipelineLike { * to write data to the output targets. */ def run(): PipelineResult = { - InterpreterRunner.addReplJarsToJob(getConfiguration()) + PipelineLike.setupConf(getConfiguration()) jpipeline.run() } @@ -134,7 +138,7 @@ trait PipelineLike { * this run or previous calls to `run`. */ def done(): PipelineResult = { - InterpreterRunner.addReplJarsToJob(getConfiguration()) + PipelineLike.setupConf(getConfiguration()) jpipeline.done() } @@ -152,3 +156,13 @@ trait PipelineLike { */ def debug(): Unit = jpipeline.enableDebug() } + +object PipelineLike { + def setupConf(conf: Configuration) { + InterpreterRunner.addReplJarsToJob(conf) + if (conf.get("crunch.reflectdatafactory", "").isEmpty) { + // Enables the Scala-specific ReflectDataFactory + conf.set("crunch.reflectdatafactory", classOf[ScalaReflectDataFactory].getCanonicalName) + } + } +} \ No newline at end of file
