Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 7da6de5c6 -> 5a51b1396
CRUNCH-417: Avro/Scrunch integration improvements and tests Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5a51b139 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5a51b139 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5a51b139 Branch: refs/heads/apache-crunch-0.8 Commit: 5a51b1396168f145455e94468ba0ee31daa71350 Parents: 7da6de5 Author: Josh Wills <[email protected]> Authored: Fri Jun 13 20:40:29 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jun 16 15:08:26 2014 -0700 ---------------------------------------------------------------------- .../types/avro/SafeAvroSerialization.java | 2 +- .../crunch/scrunch/AvroReflectionTest.scala | 55 ++++++++++++++++++++ .../apache/crunch/scrunch/DeepCopyTest.scala | 43 +++++++++++++-- .../crunch/scrunch/ScalaSafeReflectData.java | 6 +++ .../scrunch/ScalaSafeReflectDatumReader.java | 24 ++++----- .../org/apache/crunch/scrunch/PTypeFamily.scala | 12 ++++- 6 files changed, 125 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java index 9205056..59af994 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java @@ -58,7 +58,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW DatumReader<T> datumReader = null; if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) { - datumReader = AvroMode.REFLECT.getReader(schema); + datumReader = AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(schema); } else { datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema); } http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala new file mode 100644 index 0000000..047df5d --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala @@ -0,0 +1,55 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import java.nio.ByteBuffer +import org.junit.Test +import org.apache.crunch.types.PType + +class AvroRecord1(var ba: Array[Byte], var bbl: Array[ByteBuffer]) { + def this() { this(null, null) } +} + +class AvroReflectionTest extends CrunchSuite { + + def assertEquals[T](t: T, pt: PType[T]) = { + t.equals(pt.getInputMapFn().map(pt.getOutputMapFn().map(t))) + } + + @Test def testAvroRecord1 { + val pt = Avros.reflects[AvroRecord1] + val r = new AvroRecord1(Array[Byte](127), + Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](4, 13, 12)), ByteBuffer.wrap(Array[Byte](13, 14, 10)))) + assertEquals(r, pt) + } + + @Test def runAvroRecord1 { + val shakes = tempDir.copyResourceFileName("shakes.txt") + val p = Pipeline.mapReduce(classOf[AvroReflectionTest], tempDir.getDefaultConfiguration) + p.read(From.textFile(shakes, Avros.strings)) + .map(x => new AvroRecord1(Array[Byte](1), Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](2))))) + .by(x => 1) + .groupByKey(1) + .ungroup() + .write(To.avroFile(tempDir.getFileName("out"))) + p.done() + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala index 816993b..25febc2 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala @@ -25,14 +25,51 @@ import org.apache.hadoop.conf.Configuration import _root_.org.junit.Assert._ import _root_.org.junit.Test +import java.nio.ByteBuffer case class Rec1(var k: Int, var v: String) { def this() = this(0, "") } case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) } case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)} +case class BBRec(var k: ByteBuffer, var ll: Array[ByteBuffer]) { def this() = this(null, null) } + +object DeepCopyTest { + def getIterator(bbr: BBRec) = new Iterator[(ByteBuffer, ByteBuffer)] { + val nested = bbr.ll.iterator + + def hasNext() = nested.hasNext + + def next() = (bbr.k, nested.next) + } +} + class DeepCopyTest extends CrunchSuite { + import DeepCopyTest._ + lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration) + @Test def runDeepCopyBB { + val prefix = tempDir.getFileName("bytebuffers") + val bb1 = ByteBuffer.wrap(Array[Byte](1, 2)) + val bb2 = ByteBuffer.wrap(Array[Byte](3, 4)) + val bb3 = ByteBuffer.wrap(Array[Byte](5, 6)) + val bb4 = ByteBuffer.wrap(Array[Byte](7, 8)) + + val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3))) + val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4))) + writeCollection(new Path(prefix + "/ones"), ones) + writeCollection(new Path(prefix + "/twos"), twos) + + val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec])) + val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec])) + + val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_))) + .keys + .materialize + assert(m.size == 4) + pipe.done() + } + @Test def runDeepCopy { val prefix = tempDir.getFileName("isolation") @@ -73,9 +110,9 @@ class DeepCopyTest extends CrunchSuite { @SuppressWarnings(Array("rawtypes", "unchecked")) private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) { val r: AnyRef = records.iterator.next() - val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass) - - val writer = new ScalaReflectDataFactory().getWriter[T](schema) + val factory = new ScalaReflectDataFactory() + val schema = factory.getData().getSchema(r.getClass) + val writer = factory.getWriter[T](schema) val dataFileWriter = new DataFileWriter(writer) dataFileWriter.create(schema, outputStream) http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java index 6885f3e..fabc8ff 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java @@ -22,6 +22,7 @@ import java.lang.reflect.GenericArrayType; import java.lang.reflect.Modifier; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -133,11 +134,16 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull { return result; } Schema result = Schema.createArray(createSchema(component, names)); + result.addProp(CLASS_PROP, c.getName()); + result.addProp(ELEMENT_PROP, component.getName()); setElement(result, component); return result; } if (CharSequence.class.isAssignableFrom(c)) // String return Schema.create(Schema.Type.STRING); + if (ByteBuffer.class.isAssignableFrom(c)) { + return Schema.create(Schema.Type.BYTES); + } String fullName = c.getName(); Schema schema = names.get(fullName); if (schema == null) { http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java index 0db545e..c03d17f 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java @@ -83,32 +83,32 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> { private static scala.collection.Iterable toIter(Object array) { return JavaConversions.collectionAsScalaIterable((Collection) array); } - + @Override @SuppressWarnings("unchecked") protected Object newArray(Object old, int size, Schema schema) { - Class collectionClass = - ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP); - Class elementClass = - ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP); - + Class collectionClass = ScalaSafeReflectData.getClassProp(schema, + ScalaSafeReflectData.CLASS_PROP); + Class elementClass = ScalaSafeReflectData.getClassProp(schema, + ScalaSafeReflectData.ELEMENT_PROP); if (collectionClass == null && elementClass == null) - return super.newArray(old, size, schema); // use specific/generic - + return super.newArray(old, size, schema); // use specific/generic ScalaSafeReflectData data = ScalaSafeReflectData.getInstance(); + if (collectionClass != null && !collectionClass.isArray()) { if (old instanceof Collection) { ((Collection)old).clear(); return old; } if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) || - collectionClass.isAssignableFrom(ArrayList.class)) - return new ArrayList(); + collectionClass.isAssignableFrom(ArrayList.class)) { + return Lists.newArrayList(); + } return data.newInstance(collectionClass, schema); } - - if (elementClass == null) + if (elementClass == null) { elementClass = data.getClass(schema.getElementType()); + } return Array.newInstance(elementClass, size); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/5a51b139/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index aadb026..bd03f6f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -36,7 +36,15 @@ class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boo } } - override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input) + override def map(input: S): T = { + if (input == null) { + return null.asInstanceOf[T] + } else if (init) { + return f(pt.get.getDetachedValue(input)) + } else { + return f(input) + } + } } trait PTypeFamily { @@ -174,6 +182,8 @@ object Avros extends PTypeFamily { override def writables[T <: Writable : ClassTag] = CAvros.writables( implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) + override def records[T: ClassTag] = reflects()(implicitly[ClassTag[T]]) + def specifics[T <: SpecificRecord : ClassTag]() = { CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) }
