Repository: incubator-gearpump Updated Branches: refs/heads/master 5d524918d -> 5cabd8ca3
[GEARPUMP-23] Do not group by windows in GroupByPartitioner Author: manuzhang <[email protected]> Closes #139 from manuzhang/group_by. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5cabd8ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5cabd8ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5cabd8ca Branch: refs/heads/master Commit: 5cabd8ca3406de65de74a59c0e57147f00b9edc3 Parents: 5d52491 Author: manuzhang <[email protected]> Authored: Tue Feb 7 20:32:25 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 7 20:33:19 2017 +0800 ---------------------------------------------------------------------- .../dsl/partitioner/GroupByPartitioner.scala | 7 ++- .../apache/gearpump/streaming/dsl/plan/OP.scala | 8 ++-- .../gearpump/streaming/dsl/plan/Planner.scala | 4 +- .../streaming/dsl/scalaapi/Stream.scala | 6 +-- .../streaming/dsl/window/api/GroupByFn.scala | 47 -------------------- .../streaming/dsl/window/impl/Window.scala | 7 ++- .../partitioner/GroupByPartitionerSpec.scala | 13 +----- .../gearpump/streaming/dsl/plan/OpSpec.scala | 12 ++--- .../streaming/dsl/plan/PlannerSpec.scala | 19 ++------ 9 files changed, 27 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala index efa7409..7e1214e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.streaming.dsl.partitioner import org.apache.gearpump.Message import org.apache.gearpump.streaming.partitioner.UnicastPartitioner -import org.apache.gearpump.streaming.dsl.window.api.GroupByFn /** * Partition messages by applying group by function first. @@ -39,10 +38,10 @@ import org.apache.gearpump.streaming.dsl.window.api.GroupByFn * @param fn First apply message with groupBy function, then pick the hashCode of the output * to do the partitioning. You must define hashCode() for output type of groupBy function. */ -class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group]) - extends UnicastPartitioner { +class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner { + override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = fn.groupBy(message).hashCode() + val hashCode = fn(message.msg.asInstanceOf[T]).hashCode() (hashCode & Integer.MAX_VALUE) % partitionNum } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 56f16e1..708e0d2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, FunctionRunner} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask -import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task @@ -149,7 +149,7 @@ case class ChainableOp[IN, OUT]( * This represents a Processor with window aggregation */ case class GroupByOp[IN, GROUP]( - groupByFn: GroupByFn[IN, GROUP], + groupBy: GroupAlsoByWindow[IN, GROUP], parallelism: Int = 1, description: String = "groupBy", override val userConfig: UserConfig = UserConfig.empty) @@ -158,7 +158,7 @@ case class GroupByOp[IN, GROUP]( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: ChainableOp[_, _] => - GroupByOp(groupByFn, parallelism, description, + GroupByOp(groupBy, parallelism, description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) case _ => throw new OpChainException(this, other) @@ -166,7 +166,7 @@ case class GroupByOp[IN, GROUP]( } override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - groupByFn.getProcessor(parallelism, description, userConfig) + groupBy.getProcessor(parallelism, description, userConfig) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 65f9cd2..1dd8026 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -40,8 +40,8 @@ class Planner { edge match { case Shuffle => node2 match { - case groupBy: GroupByOp[_, _] => - new GroupByPartitioner(groupBy.groupByFn) + case op: GroupByOp[_, _] => + new GroupByPartitioner(op.groupBy.groupByFn) case _ => new HashPartitioner } case Direct => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index bdb245c..f71276b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.dsl.plan.functions._ import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowAndGroup} +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -210,8 +210,8 @@ class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy"): Stream[T] = { - val groupBy: GroupByFn[T, List[WindowAndGroup[GROUP]]] = GroupAlsoByWindow(fn, window) - val groupOp = GroupByOp[T, List[WindowAndGroup[GROUP]]](groupBy, parallelism, + val groupBy = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, GROUP](groupBy, parallelism, s"$winDesc.$description") graph.addVertex(groupOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala deleted file mode 100644 index 30e68ba..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.api - -import akka.actor.ActorSystem -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.task.Task - -/** - * Divides messages into groups according its payload and timestamp. - * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]] - * for default implementation. - */ -trait GroupByFn[T, GROUP] { - - /** - * Used by - * 1. GroupByPartitioner to shuffle messages - * 2. WindowRunner to group messages for time-based aggregation - */ - def groupBy(message: Message): GROUP - - /** - * Returns a Processor according to window trigger during planning - */ - def getProcessor(parallelism: Int, description: String, - userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] -} - - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index eb5d551..fe644af 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -64,10 +64,9 @@ case class WindowAndGroup[GROUP](window: Window, group: GROUP) } } -case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) - extends GroupByFn[T, List[WindowAndGroup[GROUP]]] { +case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) { - override def groupBy(message: Message): List[WindowAndGroup[GROUP]] = { + def groupBy(message: Message): List[WindowAndGroup[GROUP]] = { val ele = message.msg.asInstanceOf[T] val group = groupByFn(ele) val windows = window.windowFn(new WindowFunction.Context[T] { @@ -77,7 +76,7 @@ case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T] windows.map(WindowAndGroup(_, group)).toList } - override def getProcessor(parallelism: Int, description: String, + def getProcessor(parallelism: Int, description: String, userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this) window.trigger match { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index fb45e35..1934d14 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -18,13 +18,9 @@ package org.apache.gearpump.streaming.dsl.partitioner -import java.time.Duration - import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People -import org.apache.gearpump.streaming.dsl.window.api.{FixedWindows, GroupByFn} -import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowAndGroup} class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { @@ -34,16 +30,11 @@ class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterA val michelle = People("Michelle", "female") val partitionNum = 10 - val groupByFn: GroupByFn[People, List[WindowAndGroup[String]]] = - GroupAlsoByWindow[People, String](_.gender, - FixedWindows.apply[People](Duration.ofMillis(5))) - val groupBy = new GroupByPartitioner[People, List[WindowAndGroup[String]]](groupByFn) + + val groupBy = new GroupByPartitioner[People, String](_.gender) groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe groupBy.getPartition(Message(tom, 2L), partitionNum) - groupBy.getPartition(Message(mark, 1L), partitionNum) should not be - groupBy.getPartition(Message(tom, 6L), partitionNum) - groupBy.getPartition(Message(mark, 2L), partitionNum) should not be groupBy.getPartition(Message(michelle, 3L), partitionNum) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index 461d3da..d007e09 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -27,7 +27,7 @@ import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -169,8 +169,8 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "GroupByOp" should { "chain ChainableOp" in { - val groupByFn = mock[GroupByFn[Any, Any]] - val groupByOp = GroupByOp[Any, Any](groupByFn) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupBy) val fn = mock[FunctionRunner[Any, Any]] val chainableOp = mock[ChainableOp[Any, Any]] when(chainableOp.fn).thenReturn(fn) @@ -186,11 +186,11 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } "delegate to groupByFn on getProcessor" in { - val groupByFn = mock[GroupByFn[Any, Any]] - val groupByOp = GroupByOp[Any, Any](groupByFn) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupBy) groupByOp.getProcessor - verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) + verify(groupBy).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 3f23fa9..2e4bbb3 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -29,7 +29,8 @@ import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.dsl.window.api.CountWindows +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.{MockUtil, Processor} @@ -57,7 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc "Planner" should "chain operations" in { val graph = Graph.empty[Op, OpEdge] val sourceOp = DataSourceOp(new AnySource) - val groupByOp = GroupByOp(new AnyGroupByFn) + val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1)) + val groupByOp = GroupByOp(groupBy) val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) val reduceOp = ChainableOp[Any, Any](anyReducer) val processorOp = new ProcessorOp[AnyTask] @@ -93,7 +95,6 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc object PlannerSpec { - private val anyParallelism = 1 private val anyFlatMapper = new FlatMapper[Any, Any]( FlatMapFunction(Option(_)), "flatMap") private val anyReducer = new Reducer[Any]( @@ -120,16 +121,4 @@ object PlannerSpec { override def close(): Unit = {} } - - class AnyGroupByFn extends GroupByFn[Any, Any] { - - override def groupBy(message: Message): Any = message.msg - - override def getProcessor( - parallelism: Int, - description: String, - userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { - Processor[AnyTask](anyParallelism, description) - } - } }
