Updated Branches: refs/heads/master 12dea675b -> fb172fd84
CRUNCH-301: Clever deep copies in Scrunch code Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fb172fd8 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fb172fd8 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fb172fd8 Branch: refs/heads/master Commit: fb172fd84e35b4a81c4d344961259acd05f936e8 Parents: 12dea67 Author: Josh Wills <[email protected]> Authored: Wed Nov 20 18:26:38 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Nov 21 12:57:24 2013 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/DoFn.java | 19 ++++- .../org/apache/crunch/types/avro/AvroType.java | 7 ++ .../crunch/types/writable/WritableType.java | 2 + .../apache/crunch/scrunch/DeepCopyTest.scala | 89 ++++++++++++++++++++ .../org/apache/crunch/scrunch/PTypeFamily.scala | 13 ++- 5 files changed, 126 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index a052d09..c2ed35d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; */ public abstract class DoFn<S, T> implements Serializable { private transient TaskInputOutputContext<?, ?, ?, ?> context; + private transient Configuration conf; /** * Configure this DoFn. Subclasses may override this method to modify the @@ -106,6 +107,16 @@ public abstract class DoFn<S, T> implements Serializable { } /** + * Called during the setup of an initialized {@link org.apache.crunch.types.PType} that + * relies on this instance. + * + * @param conf The configuration for the {@code PType} being initialized + */ + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + /** * Returns an estimate of how applying this function to a {@link PCollection} * will cause it to change in side. The optimizer uses these estimates to * decide where to break up dependent MR jobs into separate Map and Reduce @@ -137,7 +148,13 @@ public abstract class DoFn<S, T> implements Serializable { } protected Configuration getConfiguration() { - return context.getConfiguration(); + if (conf != null) { + return conf; + } else if (context != null) { + return context.getConfiguration(); + } else { + return null; + } } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java index a92b0d0..aea4951 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -167,7 +167,14 @@ public class AvroType<T> implements PType<T> { @Override public void initialize(Configuration conf) { + baseInputMapFn.setConfiguration(conf); + baseInputMapFn.initialize(); + baseOutputMapFn.setConfiguration(conf); + baseOutputMapFn.initialize(); deepCopier.initialize(conf); + for (PType ptype : subTypes) { + ptype.initialize(conf); + } initialized = true; } http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java index 734946c..a7a9968 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java @@ -106,7 +106,9 @@ public class WritableType<T, W extends Writable> implements PType<T> { @Override public void initialize(Configuration conf) { + this.inputFn.setConfiguration(conf); this.inputFn.initialize(); + this.inputFn.setConfiguration(conf); this.outputFn.initialize(); for (PType subType : subTypes) { subType.initialize(conf); http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala new file mode 100644 index 0000000..441a9c6 --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch + +import org.apache.crunch.impl.mr.MRPipeline +import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.types.avro.{Avros => A} +import org.apache.avro.file.DataFileWriter +import org.apache.hadoop.fs.{Path, FSDataOutputStream} +import org.apache.hadoop.conf.Configuration + +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +case class Rec1(var k: Int, var v: String) { def this() = this(0, "") } +case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) } +case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)} + +class DeepCopyTest extends CrunchSuite { + lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration) + + @Test def runDeepCopy { + val prefix = tempDir.getFileName("isolation") + + val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye")) + val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9)) + val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6)) + + writeCollection(new Path(prefix + "/ones"), ones) + writeCollection(new Path(prefix + "/twos"), twos) + writeCollection(new Path(prefix + "/threes"), threes) + + val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1]))) + val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2]))) + val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3]))) + val res = (oneF.by(_.k) + cogroup + (twoF.by(_.k2) + innerJoin threeF.by(_.k2)) + .values() + .by(_._1.k)) + .values() + .materialize + .toList + + // Expected results vs. actual + val e12 = Seq((Rec2(1, "a", 0.4), Rec3("a", 4)), (Rec2(1, "a", 0.5), Rec3("a", 4)), (Rec2(1, "b", 0.6), Rec3("b", 5)), + (Rec2(1, "b", 0.7), Rec3("b", 5))) + val e22 = Seq((Rec2(2, "c", 9.9),Rec3("c", 6))) + assertEquals(2, res.size) + assertEquals(res.map(_._2.toList), Seq(e12, e22)) + pipe.done() + } + + private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) { + writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records) + } + + @SuppressWarnings(Array("rawtypes", "unchecked")) + private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) { + val r: AnyRef = records.iterator.next() + val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass) + + val writer = new ScalaReflectDataFactory().getWriter[T](schema) + val dataFileWriter = new DataFileWriter(writer) + dataFileWriter.create(schema, outputStream) + + for (record <- records) { + dataFileWriter.append(record) + } + dataFileWriter.close() + outputStream.close() + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 40f2070..ea69d4f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -25,8 +25,15 @@ import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JF import java.util.{Collection => JCollection} import scala.collection.JavaConversions._ -class TMapFn[S, T](f: S => T) extends MapFn[S, T] { - override def map(input: S) = f(input) +class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] { + override def initialize() { + if (!pt.isEmpty && getConfiguration() != null) { + pt.get.initialize(getConfiguration()) + init = true + } + } + + override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input) } trait PTypeFamily { @@ -40,7 +47,7 @@ trait PTypeFamily { def records[T: ClassManifest] = ptf.records(classManifest[T].erasure) def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = { - ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt) + ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt) } val longs = {
