Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 3a760cdae -> ea777acdf
Change usage of Iterable in PGroupedTable to TraversableOnce/Iterator to prevent SingleUseIterator exceptions in Scrunch Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ea777acd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ea777acd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ea777acd Branch: refs/heads/apache-crunch-0.8 Commit: ea777acdf15987bf78afcd76980416d84ddef37e Parents: 3a760cd Author: David Whiting <[email protected]> Authored: Mon Aug 4 10:20:22 2014 -0400 Committer: Josh Wills <[email protected]> Committed: Thu Aug 7 09:34:59 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/scrunch/PGroupedTable.scala | 85 ++++++++++---------- .../crunch/scrunch/PGroupedTableTest.scala | 35 ++++++++ 2 files changed, 77 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ea777acd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala index c86611a..bd48202 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala @@ -19,7 +19,6 @@ package org.apache.crunch.scrunch import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, Pair => CPair, _} import java.lang.{Iterable => JIterable} -import scala.collection.Iterable import scala.collection.JavaConversions._ import org.apache.hadoop.mapreduce.TaskInputOutputContext @@ -27,21 +26,21 @@ class PGroupedTable[K, V](val native: JGroupedTable[K, V]) extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] { import PGroupedTable._ - type FunctionType[T] = (K, Iterable[V]) => T - type CtxtFunctionType[T] = (K, Iterable[V], TIOC) => T + type FunctionType[T] = (K, TraversableOnce[V]) => T + type CtxtFunctionType[T] = (K, TraversableOnce[V], TIOC) => T - protected def wrapFlatMapFn[T](fmt: (K, Iterable[V]) => TraversableOnce[T]) = flatMapFn(fmt) - protected def wrapMapFn[T](fmt: (K, Iterable[V]) => T) = mapFn(fmt) - protected def wrapFilterFn(fmt: (K, Iterable[V]) => Boolean) = filterFn(fmt) - protected def wrapFlatMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => TraversableOnce[T]) = { + protected def wrapFlatMapFn[T](fmt: (K, TraversableOnce[V]) => TraversableOnce[T]) = flatMapFn(fmt) + protected def wrapMapFn[T](fmt: (K, TraversableOnce[V]) => T) = mapFn(fmt) + protected def wrapFilterFn(fmt: (K, TraversableOnce[V]) => Boolean) = filterFn(fmt) + protected def wrapFlatMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => TraversableOnce[T]) = { flatMapWithCtxtFn(fmt) } - protected def wrapMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => T) = mapWithCtxtFn(fmt) - protected def wrapFilterWithCtxtFn(fmt: (K, Iterable[V], TIOC) => Boolean) = filterWithCtxtFn(fmt) - protected def wrapPairFlatMapFn[S, T](fmt: (K, Iterable[V]) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt) - protected def wrapPairMapFn[S, T](fmt: (K, Iterable[V]) => (S, T)) = pairMapFn(fmt) + protected def wrapMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => T) = mapWithCtxtFn(fmt) + protected def wrapFilterWithCtxtFn(fmt: (K, TraversableOnce[V], TIOC) => Boolean) = filterWithCtxtFn(fmt) + protected def wrapPairFlatMapFn[S, T](fmt: (K, TraversableOnce[V]) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt) + protected def wrapPairMapFn[S, T](fmt: (K, TraversableOnce[V]) => (S, T)) = pairMapFn(fmt) - def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f)) + def combine(f: TraversableOnce[V] => V) = combineValues(new TraversableOnceCombineFn[K, V](f)) def combineValues(agg: Aggregator[V]) = new PTable[K, V](native.combineValues(agg)) @@ -58,68 +57,68 @@ class PGroupedTable[K, V](val native: JGroupedTable[K, V]) } } -class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] { +class TraversableOnceCombineFn[K, V](f: TraversableOnce[V] => V) extends CombineFn[K, V] { override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = { - emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second())))) + emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()).iterator))) } } -trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], TraversableOnce[T]] { +trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V], TraversableOnce[T]] { override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) { - for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) { + for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)) { emitter.emit(v) } } } -trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], T] { +trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V], T] { override def map(input: CPair[K, JIterable[V]]) = { - apply(input.first(), iterableAsScalaIterable[V](input.second())) + apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator) } } -trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] { +trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, TraversableOnce[V], Boolean] { override def accept(input: CPair[K, JIterable[V]]) = { - apply(input.first(), iterableAsScalaIterable[V](input.second())) + apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator) } } -class SDoGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) +class SDoGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) extends DoFn[CPair[K, JIterable[V]], T] { override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) { - for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()), getContext)) { + for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)) { emitter.emit(v) } } } -class SMapGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => T) +class SMapGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => T) extends MapFn[CPair[K, JIterable[V]], T] { override def map(input: CPair[K, JIterable[V]]) = { - f(input.first(), iterableAsScalaIterable[V](input.second()), getContext) + f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext) } } -class SFilterGroupedWithCtxtFn[K, V](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => Boolean) +class SFilterGroupedWithCtxtFn[K, V](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => Boolean) extends FilterFn[CPair[K, JIterable[V]]] { override def accept(input: CPair[K, JIterable[V]]) = { - f.apply(input.first(), iterableAsScalaIterable[V](input.second()), getContext) + f.apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext) } } trait SDoPairGroupedFn[K, V, S, T] extends DoFn[CPair[K, JIterable[V]], CPair[S, T]] - with Function2[K, Iterable[V], TraversableOnce[(S, T)]] { + with Function2[K, TraversableOnce[V], TraversableOnce[(S, T)]] { override def process(input: CPair[K, JIterable[V]], emitter: Emitter[CPair[S, T]]) { - for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) { + for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)) { emitter.emit(CPair.of(v._1, v._2)) } } } trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[S, T]] - with Function2[K, Iterable[V], (S, T)] { + with Function2[K, TraversableOnce[V], (S, T)] { override def map(input: CPair[K, JIterable[V]]) = { - val t = apply(input.first(), iterableAsScalaIterable[V](input.second())) + val t = apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator) CPair.of(t._1, t._2) } } @@ -127,35 +126,35 @@ trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[ object PGroupedTable { type TIOC = TaskInputOutputContext[_, _, _, _] - def flatMapFn[K, V, T](fn: (K, Iterable[V]) => TraversableOnce[T]) = { - new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def flatMapFn[K, V, T](fn: (K, TraversableOnce[V]) => TraversableOnce[T]) = { + new SDoGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) } } - def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = { - new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def mapFn[K, V, T](fn: (K, TraversableOnce[V]) => T) = { + new SMapGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) } } - def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = { - new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def filterFn[K, V](fn: (K, TraversableOnce[V]) => Boolean) = { + new SFilterGroupedFn[K, V] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) } } - def flatMapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => TraversableOnce[T]) = { + def flatMapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => TraversableOnce[T]) = { new SDoGroupedWithCtxtFn[K, V, T](fn) } - def mapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => T) = { + def mapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => T) = { new SMapGroupedWithCtxtFn[K, V, T](fn) } - def filterWithCtxtFn[K, V](fn: (K, Iterable[V], TIOC) => Boolean) = { + def filterWithCtxtFn[K, V](fn: (K, TraversableOnce[V], TIOC) => Boolean) = { new SFilterGroupedWithCtxtFn[K, V](fn) } - def pairMapFn[K, V, S, T](fn: (K, Iterable[V]) => (S, T)) = { - new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def pairMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => (S, T)) = { + new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) } } - def pairFlatMapFn[K, V, S, T](fn: (K, Iterable[V]) => TraversableOnce[(S, T)]) = { - new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def pairFlatMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => TraversableOnce[(S, T)]) = { + new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/ea777acd/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala new file mode 100644 index 0000000..f5cfcf9 --- /dev/null +++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PGroupedTableTest.scala @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch + +import org.junit.Test +import org.junit.Assert._ + +class PGroupedTableTest extends CrunchSuite { + @Test + def testReduceInCombine() { + val aSum = Mem.collectionOf(("a", 1), ("a", 2), ("b", 1)) + .map{ case (a,b) => (a,b) } + .groupByKey() + .combine(_.reduce {_+_} ) + .asMap() + .value()("a") + + assertEquals(3, aSum) + } +} \ No newline at end of file
