Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 4693cec0f -> 7b0054ec4
CRUNCH-447: Secondary sort and shard for Scrunch Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7b0054ec Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7b0054ec Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7b0054ec Branch: refs/heads/apache-crunch-0.8 Commit: 7b0054ec45e5ae7e7c174501805928511b241e30 Parents: 4693cec Author: Josh Wills <[email protected]> Authored: Wed Jul 16 15:27:04 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jul 17 12:36:51 2014 -0700 ---------------------------------------------------------------------- .../src/it/resources/secondary_sort_input.txt | 7 --- .../crunch/scrunch/SecondarySortTest.scala | 50 ++++++++++++++++ .../apache/crunch/scrunch/PCollectionLike.scala | 9 +++ .../org/apache/crunch/scrunch/PTable.scala | 63 +++++++++++++++++++- .../src/main/resources/secondary_sort_input.txt | 7 +++ 5 files changed, 127 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7b0054ec/crunch-core/src/it/resources/secondary_sort_input.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/secondary_sort_input.txt b/crunch-core/src/it/resources/secondary_sort_input.txt deleted file mode 100644 index 3c7be93..0000000 --- a/crunch-core/src/it/resources/secondary_sort_input.txt +++ /dev/null @@ -1,7 +0,0 @@ -one,1,1 -one,2,-3 -two,4,5 -two,2,6 -two,1,7,9 -three,0,-1 -one,-5,10 http://git-wip-us.apache.org/repos/asf/crunch/blob/7b0054ec/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala new file mode 100644 index 0000000..1ae1e9c --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/SecondarySortTest.scala @@ -0,0 +1,50 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.crunch.scrunch + +import org.junit.Assert._ +import org.junit.Test + +class SecondarySortTest extends CrunchSuite with Serializable { + + @Test def testSecondarySortAvros { + runSecondarySort(Avros) + } + + @Test def testSecondarySortWritables { + runSecondarySort(Writables) + } + + def runSecondarySort(ptf: PTypeFamily) { + val p = Pipeline.mapReduce(classOf[SecondarySortTest], tempDir.getDefaultConfiguration) + val inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt") + val lines = p.read(From.textFile(inputFile, ptf.strings)).map(input => { + val pieces = input.split(",") + (pieces(0), (pieces(1).trim.toInt, pieces(2).trim.toInt)) + }) + .secondarySortAndMap((k, iter: Iterable[(Int, Int)]) => { + List(k, iter.mkString(",")).mkString(",") + }) + .materialize + assertEquals(List("one,(-5,10),(1,1),(2,-3)", "three,(0,-1)", "two,(1,7),(2,6),(4,5)"), lines.toList) + p.done + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7b0054ec/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index 3d35bc6..b2216f1 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -20,6 +20,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.{PCollection => JCollection, Pair => JPair, _} import org.apache.crunch.types.{PType, PTableType} import org.apache.crunch.types.writable.WritableTypeFamily +import org.apache.crunch.lib.Shard /** * Base trait for PCollection-like entities in Scrunch, including PTables and PGroupedTables. @@ -269,6 +270,14 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { } /** + * Re-partitions this instance into the given number of partitions. + * + * @param numPartitions the number of partitions to use + * @return a re-partitioned version of the data in this instance + */ + def shard(numPartitions: Int) = wrap(Shard.shard(native, numPartitions)) + + /** * Gets the number of elements represented by this PCollection. * * @return The number of elements in this PCollection. http://git-wip-us.apache.org/repos/asf/crunch/blob/7b0054ec/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 50ddc5c..eaba071 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -17,13 +17,14 @@ */ package org.apache.crunch.scrunch +import java.lang.{Iterable => JIterable} import java.util.{Collection => JCollect} import scala.collection.JavaConversions._ import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, _} -import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables} -import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType} +import org.apache.crunch.lib._ +import org.apache.crunch.lib.join.{JoinUtils, DefaultJoinStrategy, JoinType} import org.apache.crunch.types.{PTableType, PType} import scala.collection.Iterable import org.apache.hadoop.mapreduce.TaskInputOutputContext @@ -54,6 +55,30 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] filter((k, v) => pf.isDefinedAt((k, v))).map((k, v) => pf((k, v)))(pt, b) } + def secondarySortAndMap[K2, VX, T, To](f: (K, Iterable[(K2, VX)]) => T, numReducers: Int = -1) + (implicit ev: <:<[V, (K2, VX)], pt: PTypeH[T], b: CanParallelTransform[T, To]) = { + b(prepareSecondarySort(numReducers), secSortMap(f), pt.get(getTypeFamily())) + } + + def secondarySortAndFlatMap[K2, VX, T, To](f: (K, Iterable[(K2, VX)]) => TraversableOnce[T], numReducers: Int = -1) + (implicit ev: <:<[V, (K2, VX)], pt: PTypeH[T], b: CanParallelTransform[T, To]) = { + b(prepareSecondarySort(numReducers), secSortFlatMap(f), pt.get(getTypeFamily())) + } + + private def prepareSecondarySort[K2, VX](numReducers: Int)(implicit ev: <:<[V, (K2, VX)]): PGroupedTable[CPair[K, K2], (K2, VX)] = { + val basePTF = getTypeFamily().ptf + val gopts = GroupingOptions.builder() + .requireSortedKeys() + .groupingComparatorClass(JoinUtils.getGroupingComparator(basePTF)) + .partitionerClass(JoinUtils.getPartitionerClass(basePTF)) + if (numReducers > 0) { + gopts.numReducers(numReducers) + } + val kt = basePTF.pairs(keyType(), valueType().getSubTypes.get(0).asInstanceOf[PType[K2]]) + val ptt = basePTF.tableOf(kt, valueType.asInstanceOf[PType[(K2, VX)]]) + parallelDo(new SPrepareSSFn[K, V, K2, VX], ptt).groupByKey(gopts.build()) + } + def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = { val ptf = getTypeFamily() val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf)) @@ -262,6 +287,30 @@ trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]] with Func override def map(input: CPair[K, V]) = CPair.of(apply(input.first()), input.second()) } +private class SPrepareSSFn[K, V, K2, VX] extends MapFn[CPair[K, V], CPair[CPair[K, K2], (K2, VX)]] { + override def map(input: CPair[K, V]): CPair[CPair[K, K2], (K2, VX)] = { + val sec = input.second().asInstanceOf[(K2, VX)] + CPair.of(CPair.of(input.first(), sec._1), sec) + } +} + +trait SecSortFlatMapFn[K, K2, VX, T] extends DoFn[CPair[CPair[K, K2], JIterable[(K2, VX)]], T] + with Function2[K, Iterable[(K2, VX)], TraversableOnce[T]] { + override def process(input: CPair[CPair[K, K2], JIterable[(K2, VX)]], emitter: Emitter[T]) { + val iter = iterableAsScalaIterable(input.second()) + for (v <- apply(input.first().first(), iter)) { + emitter.emit(v) + } + } +} + +trait SecSortMapFn[K, K2, VX, T] extends MapFn[CPair[CPair[K, K2], JIterable[(K2, VX)]], T] + with Function2[K, Iterable[(K2, VX)], T] { + override def map(input: CPair[CPair[K, K2], JIterable[(K2, VX)]]) = { + apply(input.first().first(), iterableAsScalaIterable(input.second())) + } +} + object PTable { type TIOC = TaskInputOutputContext[_, _, _, _] @@ -316,4 +365,14 @@ object PTable { def incValueFn[K, V, T](fn: V => T) = new Function1[CPair[K, V], T] with Serializable { def apply(p: CPair[K, V]): T = fn(p.second()) } + + def secSortFlatMap[K, K2, VX, T](fn: (K, Iterable[(K2, VX)]) => TraversableOnce[T]) = { + new SecSortFlatMapFn[K, K2, VX, T] { + def apply(k: K, v: Iterable[(K2, VX)]) = fn(k, v) + } + } + + def secSortMap[K, K2, VX, T](fn: (K, Iterable[(K2, VX)]) => T) = new SecSortMapFn[K, K2, VX, T] { + def apply(k: K, v: Iterable[(K2, VX)]) = fn(k, v) + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/7b0054ec/crunch-test/src/main/resources/secondary_sort_input.txt ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/resources/secondary_sort_input.txt b/crunch-test/src/main/resources/secondary_sort_input.txt new file mode 100644 index 0000000..3c7be93 --- /dev/null +++ b/crunch-test/src/main/resources/secondary_sort_input.txt @@ -0,0 +1,7 @@ +one,1,1 +one,2,-3 +two,4,5 +two,2,6 +two,1,7,9 +three,0,-1 +one,-5,10
