Repository: crunch Updated Branches: refs/heads/master dc03c9133 -> 9ddd029f4
CRUNCH-404: Add Option, Either, Java primitives, and protobuf/avro/thrift record support to PTypeH Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9ddd029f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9ddd029f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9ddd029f Branch: refs/heads/master Commit: 9ddd029f48c74061980c7f15187c474d288095ee Parents: dc03c91 Author: Josh Wills <[email protected]> Authored: Sun May 25 09:51:57 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Wed May 28 20:21:55 2014 -0700 ---------------------------------------------------------------------- crunch-scrunch/pom.xml | 25 +++++++- .../apache/crunch/scrunch/ScalaTypesTest.scala | 61 ++++++++++++++++++++ .../org/apache/crunch/scrunch/Conversions.scala | 52 +++++++++++++++-- .../crunch/scrunch/EmbeddedPipeline.scala | 1 - .../org/apache/crunch/scrunch/PTypeFamily.scala | 53 ++++++++++++++--- .../org/apache/crunch/scrunch/PipelineApp.scala | 4 -- pom.xml | 6 ++ 7 files changed, 183 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml index de712db..57c1b51 100644 --- a/crunch-scrunch/pom.xml +++ b/crunch-scrunch/pom.xml @@ -51,8 +51,23 @@ under the License. <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> <scope>provided</scope> </dependency> <dependency> @@ -64,6 +79,12 @@ under the License. <artifactId>crunch-test</artifactId> <scope>test</scope> </dependency> + <!-- Used by LocalJobRunner in integration tests --> + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala new file mode 100644 index 0000000..de9a5f9 --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch; + +import org.junit.Test; + +object ScalaTypesTest { + def et(s: String): Either[String, Int] = { + if (s.startsWith("a")) { + Left(s) + } else { + Right(s.length) + } + } +} + +class ScalaTypesTest extends CrunchSuite { + import ScalaTypesTest._ + + lazy val pipeline = Pipeline.mapReduce[ScalaTypesTest](tempDir.getDefaultConfiguration) + + @Test + def option { + val shakespeare = tempDir.copyResourceFileName("shakes.txt") + + val out = pipeline.read(From.textFile(shakespeare)) + .map(x => if (x.startsWith("a")) Some(x) else None) + .materialize + .take(100) + pipeline.done + assert(out.exists(!_.isEmpty)) + } + + @Test + def either { + val shakespeare = tempDir.copyResourceFileName("shakes.txt") + + val out = pipeline.read(From.textFile(shakespeare)) + .map(et) + .materialize + .take(100) + pipeline.done + assert(out.exists(_.isLeft)) + assert(out.exists(_.isRight)) + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala index 833e6d9..05baebf 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala @@ -19,12 +19,15 @@ package org.apache.crunch.scrunch import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, PTable => JTable, DoFn, Emitter} import org.apache.crunch.{Pair => CPair} -import org.apache.crunch.types.PType +import org.apache.crunch.types.{PTypes, PType} import java.nio.ByteBuffer import scala.collection.Iterable import scala.reflect.ClassTag import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.TaskInputOutputContext +import com.google.protobuf.Message +import org.apache.avro.specific.SpecificRecord +import org.apache.thrift.{TFieldIdEnum, TBase} trait CanParallelTransform[El, To] { def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To @@ -83,22 +86,61 @@ trait PTypeH[T] extends Serializable { def get(ptf: PTypeFamily): PType[T] } -object PTypeH { +trait LowPriorityPTypeH { + implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] { + def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]] + } +} + +object PTypeH extends LowPriorityPTypeH { implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs } + implicit val jlongs = new PTypeH[java.lang.Long] { def get(ptf: PTypeFamily) = ptf.jlongs } implicit val ints = new PTypeH[Int] { def get(ptf: PTypeFamily) = ptf.ints } + implicit val jints = new PTypeH[java.lang.Integer] { def get(ptf: PTypeFamily) = ptf.jints } + implicit val floats = new PTypeH[Float] { def get(ptf: PTypeFamily) = ptf.floats } + implicit val jfloats = new PTypeH[java.lang.Float] { def get(ptf: PTypeFamily) = ptf.jfloats } + implicit val doubles = new PTypeH[Double] { def get(ptf: PTypeFamily) = ptf.doubles } - implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings } + implicit val jdoubles = new PTypeH[java.lang.Double] { def get(ptf: PTypeFamily) = ptf.jdoubles } + implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans } + implicit val jbooleans = new PTypeH[java.lang.Boolean] { def get(ptf: PTypeFamily) = ptf.jbooleans } + + implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings } implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes } implicit def writables[W <: Writable : ClassTag] = new PTypeH[W] { def get(ptf: PTypeFamily): PType[W] = ptf.writables(implicitly[ClassTag[W]]) } - implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] { - def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]] + implicit def protos[T <: Message : ClassTag] = new PTypeH[T] { + def get(ptf: PTypeFamily) = { + PTypes.protos(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]], ptf.ptf) + } + } + + implicit def thrifts[T <: TBase[_ <: TBase[_, _], _ <: TFieldIdEnum] : ClassTag] = new PTypeH[T] { + def get(ptf: PTypeFamily) = { + PTypes.thrifts(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]], ptf.ptf) + } + } + + implicit def specifics[T <: SpecificRecord : ClassTag] = new PTypeH[T] { + def get(ptf: PTypeFamily) = Avros.specifics[T]() + } + + implicit def options[T: PTypeH] = new PTypeH[Option[T]] { + def get(ptf: PTypeFamily) = { + ptf.options(implicitly[PTypeH[T]].get(ptf)) + } + } + + implicit def eithers[L: PTypeH, R: PTypeH] = new PTypeH[Either[L, R]] { + def get(ptf: PTypeFamily) = { + ptf.eithers(implicitly[PTypeH[L]].get(ptf), implicitly[PTypeH[R]].get(ptf)) + } } implicit def collections[T: PTypeH] = { http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala index e9df263..039ea3e 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala @@ -18,7 +18,6 @@ package org.apache.crunch.scrunch import org.apache.hadoop.conf.Configuration -import scala.reflect.ClassTag /** * Adds a pipeline to the class it is being mixed in to. http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 9a30d58..394e2ac 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -17,7 +17,7 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn} +import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, Union, MapFn} import org.apache.crunch.types.{PType, PTypeFamily => PTF} import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables} import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros} @@ -26,6 +26,7 @@ import java.util.{Collection => JCollection} import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.hadoop.io.Writable +import org.apache.avro.specific.SpecificRecord class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] { override def initialize() { @@ -56,34 +57,66 @@ trait PTypeFamily { ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt) } + def derivedImmutable[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = { + ptf.derivedImmutable(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt) + } + + val jlongs = ptf.longs() + val longs = { val in = (x: JLong) => x.longValue() val out = (x: Long) => new JLong(x) - derived(classOf[Long], in, out, ptf.longs()) + derivedImmutable(classOf[Long], in, out, ptf.longs()) } + val jints = ptf.ints() + val ints = { val in = (x: JInt) => x.intValue() val out = (x: Int) => new JInt(x) - derived(classOf[Int], in, out, ptf.ints()) + derivedImmutable(classOf[Int], in, out, ptf.ints()) } + val jfloats = ptf.floats() + val floats = { val in = (x: JFloat) => x.floatValue() val out = (x: Float) => new JFloat(x) - derived(classOf[Float], in, out, ptf.floats()) + derivedImmutable(classOf[Float], in, out, ptf.floats()) } + val jdoubles = ptf.doubles() + val doubles = { val in = (x: JDouble) => x.doubleValue() val out = (x: Double) => new JDouble(x) - derived(classOf[Double], in, out, ptf.doubles()) + derivedImmutable(classOf[Double], in, out, ptf.doubles()) } + val jbooleans = ptf.booleans() + val booleans = { val in = (x: JBoolean) => x.booleanValue() val out = (x: Boolean) => new JBoolean(x) - derived(classOf[Boolean], in, out, ptf.booleans()) + derivedImmutable(classOf[Boolean], in, out, ptf.booleans()) + } + + def options[T](ptype: PType[T]) = { + val in: Union => Option[T] = (x: Union) => { if (x.getIndex() == 0) None else Some(x.getValue.asInstanceOf[T]) } + val out = (x: Option[T]) => { if (x.isEmpty) new Union(0, null) else new Union(1, x.get) } + derived(classOf[Option[T]], in, out, ptf.unionOf(ptf.nulls(), ptype)) + } + + def eithers[L, R](left: PType[L], right: PType[R]): PType[Either[L, R]] = { + val in: Union => Either[L, R] = (x: Union) => { + if (x.getIndex() == 0) { + Left[L, R](x.getValue.asInstanceOf[L]) + } else { + Right[L, R](x.getValue.asInstanceOf[R]) + } + } + val out = (x: Either[L, R]) => { if (x.isLeft) new Union(0, x.left.get) else new Union(1, x.right.get) } + derived(classOf[Either[L, R]], in, out, ptf.unionOf(left, right)) } def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType) @@ -141,5 +174,11 @@ object Avros extends PTypeFamily { override def writables[T <: Writable : ClassTag] = CAvros.writables( implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) - def reflects[T: ClassTag]() = CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]] + def specifics[T <: SpecificRecord : ClassTag]() = { + CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) + } + + def reflects[T: ClassTag]() = { + CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]] + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala index 11395d3..362009e 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala @@ -17,10 +17,6 @@ */ package org.apache.crunch.scrunch -import java.io.Serializable - -import scala.collection.mutable.ListBuffer - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.util.GenericOptionsParser http://git-wip-us.apache.org/repos/asf/crunch/blob/9ddd029f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 813c82d..abe0b60 100644 --- a/pom.xml +++ b/pom.xml @@ -375,6 +375,12 @@ under the License. <artifactId>commons-logging</artifactId> <version>${commons-logging.version}</version> </dependency> + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>${commons-cli.version}</version> + </dependency> <dependency> <groupId>log4j</groupId>
