Repository: crunch Updated Branches: refs/heads/master fd0bce36b -> ac4a525ad
CRUNCH-422: Scrunch collect(PartialFunction) support Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac4a525a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac4a525a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac4a525a Branch: refs/heads/master Commit: ac4a525ad65e44c564290beb4642f588e3b96e23 Parents: fd0bce3 Author: Josh Wills <[email protected]> Authored: Mon Jun 16 17:53:19 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jun 19 18:48:19 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/crunch/scrunch/Mem.scala | 6 ---- .../org/apache/crunch/scrunch/PCollection.scala | 4 +++ .../org/apache/crunch/scrunch/PTable.scala | 4 +++ .../crunch/scrunch/PartialFunctionTest.scala | 35 ++++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala index 58d646f..66abb64 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala @@ -17,16 +17,10 @@ */ package org.apache.crunch.scrunch -import java.lang.{Iterable => JIterable} - import scala.collection.JavaConversions._ -import org.apache.hadoop.conf.Configuration - import org.apache.crunch.{Pair => P} -import org.apache.crunch.{Source, TableSource, Target} import org.apache.crunch.impl.mem.MemPipeline -import org.apache.crunch.scrunch.Conversions._ /** * Object for working with in-memory PCollection and PTable instances. http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index 31c2f8a..e2f7b5b 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -75,6 +75,10 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]]) + def collect[T, To](pf: PartialFunction[S, T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = { + filter(pf.isDefinedAt(_)).map(pf)(pt, b) + } + def increment(groupName: String, counterName: String, count: Long) = { new IncrementPCollection[S](this).apply(groupName, counterName, count) } http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index aefad67..6fab61a 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -51,6 +51,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] wrap(native.parallelDo("withPType", ident, pt)) } + def collect[T, To](pf: PartialFunction[(K, V), T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = { + filter((k, v) => pf.isDefinedAt((k, v))).map((k, v) => pf((k, v)))(pt, b) + } + def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = { val ptf = getTypeFamily() val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf)) http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala new file mode 100644 index 0000000..2c2b75f --- /dev/null +++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala @@ -0,0 +1,35 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.junit.Test + +class PartialFunctionTest extends CrunchSuite { + @Test def testPartialFunction { + val pf = Mem.collectionOf(1, 2, 3).collect({ case i: Int if i != 3 => i + 1 }) + org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList) + } + + @Test def testPartialFunctionPTable { + val pf = Mem.tableOf("a" -> 1, "b" -> 2, "c" -> 3).collect({ case (k, v) if k != "c" => v + 1 }) + org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList) + } +}
