Repository: crunch Updated Branches: refs/heads/master bbeb7537a -> b90427f3e
CRUNCH-398: Add Increment one-liner functions to PCollection and PTable. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b90427f3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b90427f3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b90427f3 Branch: refs/heads/master Commit: b90427f3efbbb6c5b7bdde57aa2ca6d464ff0f18 Parents: bbeb753 Author: Josh Wills <[email protected]> Authored: Sat May 17 09:51:35 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon May 19 22:58:08 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/scrunch/IncrementTest.scala | 45 ++++++ .../org/apache/crunch/scrunch/Increment.scala | 140 +++++++++++++++++++ .../org/apache/crunch/scrunch/PCollection.scala | 10 +- .../org/apache/crunch/scrunch/PTable.scala | 24 +++- 4 files changed, 217 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala new file mode 100644 index 0000000..338051c --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch + +import org.apache.crunch.io.{From => from, To => to} + +import _root_.org.junit.Test +import _root_.org.junit.Assert.assertEquals + +class IncrementTest extends CrunchSuite { + + @Test def testIncrement { + val pipeline = Pipeline.mapReduce[IncrementTest](tempDir.getDefaultConfiguration) + val input = tempDir.copyResourceFileName("shakes.txt") + + pipeline.read(from.textFile(input, Avros.strings)) + .flatMap(_.toLowerCase.split("\\s+")) + .increment("TOP", "ALLWORDS") + .filter(!_.isEmpty()) + .increment("TOP", "NONEMPTY") + .incrementIf(_ startsWith "a")("TOP", "AWORDS_2x", 2) + .write(to.avroFile(tempDir.getFileName("somewords"))) + + val res = pipeline.done() + val sr0 = res.getStageResults.get(0) + assertEquals(21836, sr0.getCounterValue("TOP", "ALLWORDS")) + assertEquals(20366, sr0.getCounterValue("TOP", "NONEMPTY")) + assertEquals(3604, sr0.getCounterValue("TOP", "AWORDS_2x")) + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala new file mode 100644 index 0000000..f6b6ffa --- /dev/null +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala @@ -0,0 +1,140 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.apache.crunch.{FilterFn, Pair => CPair} + +/** + * The {@code Incrementable[T]} trait defines an object that allows a counter to + * be incremented and then returns a reference to another object of the same type. + * Both the {@link PCollection} and {@link PTable} types in Scrunch support the + * {@code Incrementable} trait. + */ +trait Incrementable[T] { + def increment(counter: Enum[_]): T = { + increment(counter, 1) + } + + def increment(counter: Enum[_], count: Long): T = { + increment(counter.getClass.getCanonicalName, counter.toString, count) + } + + def increment(groupEnum: Enumeration, value: Enumeration#Value): T = { + increment(groupEnum, value, 1) + } + + def increment(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = { + increment(groupEnum.toString, value.toString, count) + } + + def increment(groupName: String, counterName: String): T = { + increment(groupName, counterName, 1) + } + + def increment(groupName: String, counterName: String, count: Long): T +} + +/** + * Incrementable classes may also support conditionally incrementing a counter, + * such as via the {@link PCollection#incrementIf} method or the {@link PTable#incrementIf} + * and {@link PTable#incrementIfValue} methods. In these cases, the return type + * is an instance of {@code Increment} that returns a reference to a new PCollection/PTable + * after it is applied to a specified counter group and value. + */ +trait Increment[T] { + + def apply(counter: Enum[_]): T = { + apply(counter, 1) + } + + def apply(counter: Enum[_], count: Long): T = { + apply(counter.getClass.getCanonicalName, counter.toString, count) + } + + def apply(groupEnum: Enumeration, value: Enumeration#Value): T = { + apply(groupEnum, value, 1) + } + + def apply(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = { + apply(groupEnum.toString, value.toString, count) + } + + def apply(groupName: String, counterName: String): T = { + apply(groupName, counterName, 1) + } + + def apply(groupName: String, counterName: String, count: Long): T +} + +class IncrementPCollection[S](val pc: PCollection[S]) extends Increment[PCollection[S]] { + override def apply(groupName: String, counterName: String, count: Long) = { + pc.parallelDo("inc=" + groupName + ":" + counterName, + new CounterFn[S](groupName, counterName, count), + pc.pType()) + } +} + +class IncrementIfPCollection[S](val pc: PCollection[S], val f: S => Boolean) extends Increment[PCollection[S]] { + override def apply(groupName: String, counterName: String, count: Long) = { + pc.parallelDo("incif=" + groupName + ":" + counterName, + new IfCounterFn[S](groupName, counterName, count, f), + pc.pType()) + } +} + +class IncrementPTable[K, V](val pc: PTable[K, V]) extends Increment[PTable[K, V]] { + override def apply(groupName: String, counterName: String, count: Long) = { + pc.parallelDo("inc=" + groupName + ":" + counterName, + new CounterFn[CPair[K, V]](groupName, counterName, count), + pc.pType()) + } +} + +class IncrementIfPTable[K, V](val pc: PTable[K, V], val f: CPair[K, V] => Boolean) extends Increment[PTable[K, V]] { + override def apply(groupName: String, counterName: String, count: Long) = { + pc.parallelDo("inc=" + groupName + ":" + counterName, + new IfCounterFn[CPair[K, V]](groupName, counterName, count, f), + pc.pType()) + } +} + +class CounterFn[S](val group: String, val counter: String, val count: Long) + extends FilterFn[S] { + override def scaleFactor() = 1.0f + + def accept(s: S) = { + increment(group, counter, count) + true + } +} + +class IfCounterFn[S](val group: String, val counter: String, val count: Long, val cond: S => Boolean) + extends FilterFn[S] { + override def scaleFactor() = 1.0f + + def accept(s: S) = { + if (cond(s)) { + increment(group, counter, count) + } + true + } +} + http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index 2d4ed44..dc0ab0b 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -27,7 +27,9 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext import org.apache.crunch.types.PType import org.apache.crunch.fn.IdentityFn -class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] { +class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] + with Incrementable[PCollection[S]] { + import PCollection._ type FunctionType[T] = S => T @@ -73,6 +75,12 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]]) + def increment(groupName: String, counterName: String, count: Long) = { + new IncrementPCollection[S](this).apply(groupName, counterName, count) + } + + def incrementIf(f: S => Boolean) = new IncrementIfPCollection[S](this, f) + def count() = { val count = new PTable[S, java.lang.Long](Aggregate.count(native)) count.mapValues(_.longValue()) http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 2f88b0c..1d5a70e 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -30,7 +30,8 @@ import scala.collection.Iterable import org.apache.hadoop.mapreduce.TaskInputOutputContext import org.apache.crunch.fn.IdentityFn -class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] { +class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] + with Incrementable[PTable[K, V]] { import PTable._ type FunctionType[T] = (K, V) => T @@ -143,6 +144,16 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] new PTable[K, V](newNative.asInstanceOf[JTable[K, V]]) } + def increment(groupName: String, counterName: String, count: Long) = { + new IncrementPTable[K, V](this).apply(groupName, counterName, count) + } + + def incrementIf(f: (K, V) => Boolean) = new IncrementIfPTable[K, V](this, incFn(f)) + + def incrementIfKey(f: K => Boolean) = new IncrementIfPTable[K, V](this, incKeyFn(f)) + + def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f)) + def materialize(): Iterable[(K, V)] = { InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) native.materialize.view.map(x => (x.first, x.second)) @@ -265,4 +276,15 @@ object PTable { new SDoPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) } } + def incFn[K, V, T](fn: (K, V) => T) = new Function1[CPair[K, V], T] with Serializable { + def apply(p: CPair[K, V]): T = fn(p.first(), p.second()) + } + + def incKeyFn[K, V, T](fn: K => T) = new Function1[CPair[K, V], T] with Serializable { + def apply(p: CPair[K, V]): T = fn(p.first()) + } + + def incValueFn[K, V, T](fn: V => T) = new Function1[CPair[K, V], T] with Serializable { + def apply(p: CPair[K, V]): T = fn(p.second()) + } }
