[GEARPUMP-316] Decouple groupBy from window This includes major changes to `Stream DSL` as following
1. Decouple groupBy from window. Previously, `window` is required before `groupBy` now users can write `window.groupBy` or `groupBy.window`, even `window.groupBy.window`. 2. Correspondingly, `GroupByOp` is split into `GroupByOp` and `WindowOp`. When chaining `Op`s, there will be intermediate `Op`s such as `WindowTransformOp`, `TransformWindowTransformOp` generated before they can be translated into a function (wrapping UDF). 3. The function will be run by a `WindowRunner` in either `DataSourceTask`, `GroupByTask` or `TransformTask`. Messages are windowed, processed and triggered as defined by window function and triggers. 4. A Stream will implicitly in a global window. Its window only changes when user explicitly defines another window. Hence, `groupBy` equals to `globalWindows.groupBy.globalWindows`, `groupBy.fixedWindows` equals to `globalWindows.groupBy.fixedWindows`. 5. Bug fixes for output time and `Watermark.MAX` Author: manuzhang <[email protected]> Closes #186 from manuzhang/window. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/24e1a454 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/24e1a454 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/24e1a454 Branch: refs/heads/master Commit: 24e1a4546260ca414ca84f8df055e57f4d963263 Parents: c1370d9 Author: manuzhang <[email protected]> Authored: Wed Jun 14 10:05:32 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Jun 14 10:05:32 2017 +0800 ---------------------------------------------------------------------- .../materializer/RemoteMaterializerImpl.scala | 22 +- .../apache/gearpump/streaming/Constants.scala | 1 - .../streaming/dsl/javaapi/JavaStream.scala | 17 +- .../dsl/partitioner/GroupByPartitioner.scala | 48 ---- .../apache/gearpump/streaming/dsl/plan/OP.scala | 243 ++++++++++++++++--- .../gearpump/streaming/dsl/plan/Planner.scala | 10 +- .../dsl/plan/functions/FunctionRunner.scala | 16 +- .../streaming/dsl/scalaapi/Stream.scala | 70 +++--- .../streaming/dsl/scalaapi/StreamApp.scala | 2 +- .../streaming/dsl/task/CountTriggerTask.scala | 62 ----- .../dsl/task/EventTimeTriggerTask.scala | 58 ----- .../streaming/dsl/task/GroupByTask.scala | 73 ++++++ .../dsl/task/ProcessingTimeTriggerTask.scala | 81 ------- .../streaming/dsl/task/TransformTask.scala | 56 +---- .../streaming/dsl/window/api/Trigger.scala | 4 - .../dsl/window/api/WindowFunction.scala | 37 ++- .../streaming/dsl/window/api/Windows.scala | 43 ++-- .../streaming/dsl/window/impl/Window.scala | 28 --- .../dsl/window/impl/WindowRunner.scala | 155 ++++++------ .../partitioner/GroupByPartitioner.scala | 47 ++++ .../streaming/source/DataSourceTask.scala | 24 +- .../gearpump/streaming/source/Watermark.scala | 2 +- .../gearpump/streaming/task/Subscription.scala | 4 +- .../partitioner/GroupByPartitionerSpec.scala | 45 ---- .../gearpump/streaming/dsl/plan/OpSpec.scala | 94 +++---- .../streaming/dsl/plan/PlannerSpec.scala | 25 +- .../dsl/plan/functions/FunctionRunnerSpec.scala | 98 ++------ .../streaming/dsl/scalaapi/StreamAppSpec.scala | 4 +- .../streaming/dsl/scalaapi/StreamSpec.scala | 7 +- .../dsl/task/CountTriggerTaskSpec.scala | 61 ----- .../dsl/task/EventTimeTriggerTaskSpec.scala | 66 ----- .../streaming/dsl/task/GroupByTaskSpec.scala | 60 +++++ .../task/ProcessingTimeTriggerTaskSpec.scala | 69 ------ .../streaming/dsl/task/TransformTaskSpec.scala | 62 ++--- .../window/impl/DefaultWindowRunnerSpec.scala | 38 +-- .../partitioner/GroupByPartitionerSpec.scala | 45 ++++ .../streaming/source/DataSourceTaskSpec.scala | 51 ++-- .../streaming/task/SubscriptionSpec.scala | 7 +- 38 files changed, 761 insertions(+), 1074 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index 74fe077..e2cdbd4 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -35,11 +35,9 @@ import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTas import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper -import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle} +import org.apache.gearpump.streaming.dsl.plan.{TransformOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle} import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.api.CountWindows -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} import org.apache.gearpump.util.Graph import org.slf4j.LoggerFactory @@ -152,10 +150,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { val op = module match { case source: SourceTaskModule[_] => val updatedConf = conf.withConfig(source.conf) - DataSourceOp(source.source, parallelism, updatedConf, "source") + DataSourceOp(source.source, parallelism, "source", updatedConf) case sink: SinkTaskModule[_] => val updatedConf = conf.withConfig(sink.conf) - DataSinkOp(sink.sink, parallelism, updatedConf, "sink") + DataSinkOp(sink.sink, parallelism, "sink", updatedConf) case sourceBridge: SourceBridgeModule[_, _] => ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source") case processor: ProcessorModule[_, _, _] => @@ -164,8 +162,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { case sinkBridge: SinkBridgeModule[_, _] => ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") case groupBy: GroupByModule[Any, Any] => - GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindows.apply[Any](1).accumulating), - parallelism, "groupBy", conf) + GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf) case reduce: ReduceModule[_] => reduceOp(reduce.f, conf) case graphStage: GraphStageModule => @@ -181,7 +178,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { op }.mapEdge[OpEdge] { (n1, edge, n2) => n2 match { - case chainableOp: ChainableOp[_, _] + case chainableOp: TransformOp[_, _] if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] => Direct case _ => @@ -242,8 +239,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { withValue(FoldTask.AGGREGATOR, fold.f) ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") case groupBy: GroupBy[Any, Any] => - GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindows.apply[Any](1).accumulating), - groupBy.maxSubstreams, "groupBy", conf) + GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf) case groupedWithin: GroupedWithin[_] => val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n) @@ -285,9 +281,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { withInt(MergeTask.INPUT_PORTS, merge.inputPorts) ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge") case mergePreferred: MergePreferred[_] => - MergeOp("mergePreferred", conf) + MergeOp() case mergeSorted: MergeSorted[_] => - MergeOp("mergeSorted", conf) + MergeOp() case prefixAndTail: PrefixAndTail[_] => // TODO null @@ -480,7 +476,7 @@ object RemoteMaterializerImpl { def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String, conf: UserConfig): Op = { - ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf) + TransformOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf) } def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index 7ac1b74..d9cfb92 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -24,7 +24,6 @@ object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source" val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" - val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function" val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index da0e4db..cb9f084 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -21,7 +21,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction} import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} +import org.apache.gearpump.streaming.dsl.scalaapi.Stream import org.apache.gearpump.streaming.dsl.window.api.Windows import org.apache.gearpump.streaming.task.Task @@ -59,8 +59,8 @@ class JavaStream[T](val stream: Stream[T]) { } /** Merges streams of same type together */ - def merge(other: JavaStream[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.merge(other.stream, description)) + def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.merge(other.stream, parallelism, description)) } /** @@ -72,8 +72,8 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description)) } - def window(win: Windows[T], description: String): JavaWindowStream[T] = { - new JavaWindowStream[T](stream.window(win, description)) + def window(win: Windows): JavaStream[T] = { + new JavaStream[T](stream.window(win)) } /** Add a low level Processor to process messages */ @@ -84,10 +84,3 @@ class JavaStream[T](val stream: Stream[T]) { } } -class JavaWindowStream[T](stream: WindowStream[T]) { - - def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, - description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala deleted file mode 100644 index 3789d4e..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.partitioner - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.partitioner.UnicastPartitioner - -/** - * Partition messages by applying group by function first. - * - * For example: - * {{{ - * case class People(name: String, gender: String) - * - * object Test{ - * - * val groupBy: (People => String) = people => people.gender - * val partitioner = GroupByPartitioner(groupBy) - * } - * }}} - * - * @param fn First apply message with groupBy function, then pick the hashCode of the output - * to do the partitioning. You must define hashCode() for output type of groupBy function. - */ -class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner { - - override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = fn(message.value.asInstanceOf[T]).hashCode() - (hashCode & Integer.MAX_VALUE) % partitionNum - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 708e0d2..2a45a8f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -20,18 +20,44 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, FunctionRunner} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT} import org.apache.gearpump.streaming.{Constants, Processor} -import org.apache.gearpump.streaming.dsl.task.TransformTask -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows} +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task import scala.reflect.ClassTag +object Op { + + def concatenate(desc1: String, desc2: String): String = { + if (desc1 == null || desc1.isEmpty) desc2 + else if (desc2 == null || desc2.isEmpty) desc1 + else desc1 + "." + desc2 + } + + def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = { + config1.withConfig(config2) + } + + def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig, + processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = { + if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { + op.chain( + WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any])) + ).toProcessor + } else { + processor + } + } + +} + /** * This is a vertex on the logical plan. */ @@ -43,7 +69,7 @@ sealed trait Op { def chain(op: Op)(implicit system: ActorSystem): Op - def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] + def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] } /** @@ -67,7 +93,7 @@ case class ProcessorOp[T <: Task]( throw new OpChainException(this, other) } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { DefaultProcessor(parallelism, description, userConfig, processor) } } @@ -78,24 +104,40 @@ case class ProcessorOp[T <: Task]( case class DataSourceOp( dataSource: DataSource, parallelism: Int = 1, - userConfig: UserConfig = UserConfig.empty, - description: String = "source") + description: String = "source", + userConfig: UserConfig = UserConfig.empty) extends Op { override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: ChainableOp[_, _] => - DataSourceOp(dataSource, parallelism, - userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn), - description) + case op: WindowTransformOp[_, _] => + DataSourceOp( + dataSource, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + op.windowRunner), + op.userConfig)) + case op: TransformOp[_, _] => + chain( + WindowOp(GlobalWindows()).chain(op)) + case op: WindowOp => + chain( + op.chain(TransformOp(new DummyRunner[Any]()))) + case op: TransformWindowTransformOp[_, _, _] => + chain( + WindowOp(GlobalWindows()).chain(op.transformOp) + .chain(op.windowTransformOp)) case _ => throw new OpChainException(this, other) } } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - Processor[DataSourceTask[Any, Any]](parallelism, description, - userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[DataSourceTask[Any, Any]](parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) + ) } } @@ -105,15 +147,15 @@ case class DataSourceOp( case class DataSinkOp( dataSink: DataSink, parallelism: Int = 1, - userConfig: UserConfig = UserConfig.empty, - description: String = "sink") + description: String = "sink", + userConfig: UserConfig = UserConfig.empty) extends Op { override def chain(op: Op)(implicit system: ActorSystem): Op = { throw new OpChainException(this, op) } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { DataSinkProcessor(dataSink, parallelism, description) } } @@ -123,7 +165,7 @@ case class DataSinkOp( * (e.g. flatMap, map, filter, reduce) and further chained * to another Op to be used */ -case class ChainableOp[IN, OUT]( +case class TransformOp[IN, OUT]( fn: FunctionRunner[IN, OUT], userConfig: UserConfig = UserConfig.empty) extends Op { @@ -131,25 +173,127 @@ case class ChainableOp[IN, OUT]( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: ChainableOp[OUT, _] => + case op: TransformOp[OUT, _] => // TODO: preserve type info - ChainableOp(AndThen(fn, op.fn)) + // f3(f2(f1(in))) + // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3)) + // => AndThen(AndThen(f1, f2), f3) + TransformOp( + AndThen(fn, op.fn), + Op.concatenate(userConfig, op.userConfig)) + case op: WindowOp => + TransformWindowTransformOp(this, + WindowTransformOp(new DefaultWindowRunner[OUT, OUT]( + op.windows, new DummyRunner[OUT] + ), op.description, op.userConfig)) + case op: TransformWindowTransformOp[OUT, _, _] => + TransformWindowTransformOp(TransformOp( + AndThen(fn, op.transformOp.fn), + Op.concatenate(userConfig, op.transformOp.userConfig) + ), op.windowTransformOp) case _ => throw new OpChainException(this, other) } } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - Processor[TransformTask[Any, Any]](1, description, - userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn)) + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor } } /** - * This represents a Processor with window aggregation + * This is an intermediate operation, produced by chaining WindowOp and TransformOp. + * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp. + * Otherwise, it will be translated to a Processor of TransformTask. */ -case class GroupByOp[IN, GROUP]( - groupBy: GroupAlsoByWindow[IN, GROUP], +case class WindowTransformOp[IN, OUT]( + windowRunner: WindowRunner[IN, OUT], + description: String, + userConfig: UserConfig) extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[OUT, _] => + WindowTransformOp( + WindowRunnerAT(windowRunner, op.windowRunner), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig) + ) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp + Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + } +} + +/** + * This is an intermediate operation, produced by chaining TransformOp and WindowOp. + * It will later be chained to a WindowOp, which results in two WindowTransformOps. + * Finally, they will be chained to a single WindowTransformOp. + */ +case class TransformWindowTransformOp[IN, MIDDLE, OUT]( + transformOp: TransformOp[IN, MIDDLE], + windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { + + override def description: String = { + throw new UnsupportedOperationException(s"description is not supported on $this") + } + + override def userConfig: UserConfig = { + throw new UnsupportedOperationException(s"userConfig is not supported on $this") + } + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new UnsupportedOperationException(s"chain is not supported on $this") + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor + } +} + +/** + * This represents a window aggregation, together with a following TransformOp + */ +case class WindowOp( + windows: Windows, + userConfig: UserConfig = UserConfig.empty) extends Op { + + override def description: String = windows.description + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: TransformOp[_, _] => + WindowTransformOp(new DefaultWindowRunner(windows, op.fn), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig)) + case op: WindowOp => + chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any]))) + case op: TransformWindowTransformOp[_, _, _] => + WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn), + Op.concatenate(description, op.transformOp.description), + Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + chain(TransformOp(new DummyRunner[Any])).toProcessor + } + +} + +/** + * This represents a Processor with groupBy and window aggregation + */ +case class GroupByOp[IN, GROUP] private( + groupBy: IN => GROUP, parallelism: Int = 1, description: String = "groupBy", override val userConfig: UserConfig = UserConfig.empty) @@ -157,36 +301,57 @@ case class GroupByOp[IN, GROUP]( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: ChainableOp[_, _] => - GroupByOp(groupBy, parallelism, description, - userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case op: WindowTransformOp[_, _] => + GroupByOp( + groupBy, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate( + userConfig + .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner), + userConfig)) + case op: WindowOp => + chain(op.chain(TransformOp(new DummyRunner[Any]()))) case _ => throw new OpChainException(this, other) } } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - groupBy.getProcessor(parallelism, description, userConfig) + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[GroupByTask[IN, GROUP, Any]](parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy))) } } /** * This represents a Processor transforming merged streams */ -case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty) +case class MergeOp( + parallelism: Int = 1, + description: String = "merge", + userConfig: UserConfig = UserConfig.empty) extends Op { override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: ChainableOp[_, _] => - MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case op: WindowTransformOp[_, _] => + MergeOp( + parallelism, + description, + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + op.windowRunner), + op.userConfig)) + case op: WindowOp => + chain(op.chain(TransformOp(new DummyRunner[Any]()))) case _ => throw new OpChainException(this, other) } } - override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - Processor[TransformTask[Any, Any]](1, description, userConfig) + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[TransformTask[Any, Any]](parallelism, description, userConfig)) } } @@ -198,7 +363,7 @@ trait OpEdge /** * The upstream OP and downstream OP doesn't require network data shuffle. - * e.g. ChainableOp + * e.g. TransformOp */ case object Direct extends OpEdge @@ -211,4 +376,4 @@ case object Shuffle extends OpEdge /** * Runtime exception thrown on chaining. */ -class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2") \ No newline at end of file +class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 1dd8026..b1b39c9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -19,10 +19,8 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem - -import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph @@ -36,18 +34,18 @@ class Planner { (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { val graph = optimize(dag) - graph.mapEdge { (node1, edge, node2) => + graph.mapEdge { (_, edge, node2) => edge match { case Shuffle => node2 match { case op: GroupByOp[_, _] => - new GroupByPartitioner(op.groupBy.groupByFn) + new GroupByPartitioner(op.groupBy) case _ => new HashPartitioner } case Direct => new CoLocationPartitioner } - }.mapVertex(_.getProcessor) + }.mapVertex(_.toProcessor) } private def optimize(dag: Graph[Op, OpEdge]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala index c27300f..2c11238 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -17,16 +17,9 @@ */ package org.apache.gearpump.streaming.dsl.plan.functions -import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -object FunctionRunner { - def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT], - fn: OUT => Unit): FunctionRunner[IN, Unit] = { - AndThen(runner, new Emit(fn)) - } -} - /** * Interface to invoke SerializableFunction methods * @@ -121,12 +114,9 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String) } } -class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] { +class DummyRunner[T] extends FunctionRunner[T, T] { - override def process(value: T): TraversableOnce[Unit] = { - emit(value) - None - } + override def process(value: T): TraversableOnce[T] = Option(value) override def description: String = "" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index e15d4ae..ef2753e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.dsl.plan.functions._ import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -35,7 +34,8 @@ import scala.language.implicitConversions class Stream[T]( private val graph: Graph[Op, OpEdge], private val thisNode: Op, - private val edge: Option[OpEdge] = None) { + private val edge: Option[OpEdge] = None, + private val windows: Windows = GlobalWindows()) { /** * Returns a new stream by applying a flatMap function to each element @@ -108,11 +108,7 @@ class Stream[T]( * @return a new stream after fold */ def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = { - if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) { - transform(new FoldRunner(fn, description)) - } else { - throw new UnsupportedOperationException("fold operation can only be applied on window") - } + transform(new FoldRunner(fn, description)) } /** @@ -138,10 +134,10 @@ class Stream[T]( } private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = { - val op = ChainableOp(fn) + val op = TransformOp(fn) graph.addVertex(op) graph.addEdge(thisNode, edge.getOrElse(Direct), op) - new Stream(graph, op) + new Stream(graph, op, None, windows) } /** @@ -160,12 +156,13 @@ class Stream[T]( * @param other the other stream * @return the merged stream */ - def merge(other: Stream[T], description: String = "merge"): Stream[T] = { - val mergeOp = MergeOp(description, UserConfig.empty) + def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(parallelism, description, UserConfig.empty) graph.addVertex(mergeOp) graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) - new Stream[T](graph, mergeOp) + val winOp = Stream.addWindowOp(graph, mergeOp, windows) + new Stream[T](graph, winOp, None, windows) } /** @@ -184,23 +181,26 @@ class Stream[T]( * @param fn Group by function * @param parallelism Parallelism level * @param description The description - * @return the grouped stream + * @return the grouped stream */ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy"): Stream[T] = { - window(GlobalWindows()) - .groupBy[GROUP](fn, parallelism, description) + val gbOp = GroupByOp(fn, parallelism, description) + graph.addVertex(gbOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp) + val winOp = Stream.addWindowOp(graph, gbOp, windows) + new Stream(graph, winOp, None, windows) } /** * Window function * - * @param win window definition - * @param description window description - * @return [[WindowStream]] where groupBy could be applied + * @param windows window definition + * @return the windowed [[Stream]] */ - def window(win: Windows[T], description: String = "window"): WindowStream[T] = { - new WindowStream[T](graph, edge, thisNode, win, description) + def window(windows: Windows): Stream[T] = { + val winOp = Stream.addWindowOp(graph, thisNode, windows) + new Stream(graph, winOp, None, windows) } /** @@ -216,22 +216,10 @@ class Stream[T]( val processorOp = ProcessorOp(processor, parallelism, conf, description) graph.addVertex(processorOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) - new Stream[R](graph, processorOp, Some(Shuffle)) + new Stream[R](graph, processorOp, Some(Shuffle), windows) } -} -class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, - window: Windows[T], winDesc: String) { - def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, - description: String = "groupBy"): Stream[T] = { - val groupBy = GroupAlsoByWindow(fn, window) - val groupOp = GroupByOp[T, GROUP](groupBy, parallelism, - s"$winDesc.$description") - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) - } } class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { @@ -263,8 +251,9 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { object Stream { - def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { - new Stream[T](graph, node, edge) + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge], + windows: Windows): Stream[T] = { + new Stream[T](graph, node, edge, windows) } def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 @@ -272,6 +261,13 @@ object Stream { def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = { + val winOp = WindowOp(win) + graph.addVertex(winOp) + graph.addEdge(op, Direct, winOp) + winOp + } + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { new KVStream(stream) } @@ -279,10 +275,10 @@ object Stream { implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { def sink(dataSink: DataSink, parallelism: Int = 1, conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { - implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) + implicit val sink = DataSinkOp(dataSink, parallelism, description, conf) stream.graph.addVertex(sink) stream.graph.addEdge(stream.thisNode, Shuffle, sink) - new Stream[T](stream.graph, sink) + new Stream[T](stream.graph, sink, None, stream.windows) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index 6378a18..bce8c0c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -78,7 +78,7 @@ object StreamApp { def source[T](dataSource: DataSource, parallelism: Int = 1, conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) + implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala deleted file mode 100644 index 0dc28eb..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.Instant - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.api.CountWindowFunction -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} - -/** - * This task triggers output on number of messages in a window. - */ -class CountTriggerTask[IN, GROUP]( - groupBy: GroupAlsoByWindow[IN, GROUP], - windowRunner: WindowRunner, - taskContext: TaskContext, - userConfig: UserConfig) - extends Task(taskContext, userConfig) { - - def this(groupBy: GroupAlsoByWindow[IN, GROUP], - taskContext: TaskContext, userConfig: UserConfig) = { - this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), - taskContext, userConfig) - } - - def this(taskContext: TaskContext, userConfig: UserConfig) = { - this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, - taskContext, userConfig) - } - - private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFunction[IN]].size - private var num = 0 - - override def onNext(msg: Message): Unit = { - windowRunner.process(msg) - num += 1 - if (windowSize == num) { - windowRunner.trigger(Instant.ofEpochMilli(windowSize)) - num = 0 - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala deleted file mode 100644 index 0674339..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.Instant - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} - -/** - * This task triggers output on watermark progress. - */ -class EventTimeTriggerTask[IN, GROUP]( - groupBy: GroupAlsoByWindow[IN, GROUP], - windowRunner: WindowRunner, - taskContext: TaskContext, - userConfig: UserConfig) - extends Task(taskContext, userConfig) { - - def this(groupBy: GroupAlsoByWindow[IN, GROUP], - taskContext: TaskContext, userConfig: UserConfig) = { - this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), - taskContext, userConfig) - } - - def this(taskContext: TaskContext, userConfig: UserConfig) = { - this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, - taskContext, userConfig) - } - - override def onNext(message: Message): Unit = { - windowRunner.process(message) - } - - override def onWatermarkProgress(watermark: Instant): Unit = { - windowRunner.trigger(watermark) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala new file mode 100644 index 0000000..8301fb9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant +import java.util.function.Consumer + +import com.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * Processes messages in groups as defined by groupBy function. + */ +class GroupByTask[IN, GROUP, OUT]( + groupBy: IN => GROUP, + taskContext: TaskContext, + userConfig: UserConfig) extends Task(taskContext, userConfig) { + + def this(context: TaskContext, conf: UserConfig) = { + this( + conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get, + context, conf + ) + } + + private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] = + new UnifiedMap[GROUP, WindowRunner[IN, OUT]] + + override def onNext(message: Message): Unit = { + val input = message.value.asInstanceOf[IN] + val group = groupBy(input) + + if (!groups.containsKey(group)) { + groups.put(group, + userConfig.getValue[WindowRunner[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) + } + + groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN], + message.timestamp)) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { + override def accept(runner: WindowRunner[IN, OUT]): Unit = { + runner.trigger(watermark).foreach { + result => + taskContext.output(Message(result.value, result.timestamp)) + } + } + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala deleted file mode 100644 index a04e3ca..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.Instant -import java.util.concurrent.TimeUnit - -import akka.actor.Actor.Receive -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering -import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFunction -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} - -import scala.concurrent.duration.FiniteDuration - -object ProcessingTimeTriggerTask { - case object Triggering -} - -/** - * This task triggers output on scheduled system time interval. - */ -class ProcessingTimeTriggerTask[IN, GROUP]( - groupBy: GroupAlsoByWindow[IN, GROUP], - windowRunner: WindowRunner, - taskContext: TaskContext, - userConfig: UserConfig) - extends Task(taskContext, userConfig) { - - def this(groupBy: GroupAlsoByWindow[IN, GROUP], - taskContext: TaskContext, userConfig: UserConfig) = { - this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), - taskContext, userConfig) - } - - def this(taskContext: TaskContext, userConfig: UserConfig) = { - this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, - taskContext, userConfig) - } - - private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFunction[IN]] - private val windowSizeMs = windowFn.size.toMillis - private val windowStepMs = windowFn.step.toMillis - - override def onStart(startTime: Instant): Unit = { - val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs - taskContext.scheduleOnce( - new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering) - } - - override def onNext(message: Message): Unit = { - windowRunner.process(message) - } - - override def receiveUnManagedMessage: Receive = { - case Triggering => - windowRunner.trigger(Instant.now) - taskContext.scheduleOnce( - new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 572df94..6a455a5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,58 +22,28 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} import org.apache.gearpump.streaming.task.{Task, TaskContext} -object TransformTask { - - class Transform[IN, OUT](taskContext: TaskContext, - processor: Option[FunctionRunner[IN, OUT]], - private var buffer: Vector[Message] = Vector.empty[Message]) { - - def onNext(msg: Message): Unit = { - buffer +:= msg - } - - def onWatermarkProgress(watermark: Instant): Unit = { - var nextBuffer = Vector.empty[Message] - processor.foreach(_.setup()) - buffer.foreach { message: Message => - if (message.timestamp.isBefore(watermark)) { - processor match { - case Some(p) => - FunctionRunner - .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp))) - // .toList forces eager evaluation - .process(message.value.asInstanceOf[IN]).toList - case None => - taskContext.output(message) - } - } else { - nextBuffer +:= message - } - } - processor.foreach(_.teardown()) - buffer = nextBuffer - } - } - -} - -class TransformTask[IN, OUT](transform: Transform[IN, OUT], +class TransformTask[IN, OUT]( + runner: WindowRunner[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf) + def this(context: TaskContext, conf: UserConfig) = { + this( + conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + context, conf + ) } override def onNext(msg: Message): Unit = { - transform.onNext(msg) + runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) } override def onWatermarkProgress(watermark: Instant): Unit = { - transform.onWatermarkProgress(watermark) + runner.trigger(watermark).foreach { + result => + taskContext.output(Message(result.value, result.timestamp)) + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala index 9865e18..02d52a0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -21,7 +21,3 @@ sealed trait Trigger case object EventTimeTrigger extends Trigger -case object ProcessingTimeTrigger extends Trigger - -case object CountTrigger extends Trigger - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index 7da9c85..a2f51c7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -32,35 +32,39 @@ object WindowFunction { } } -trait WindowFunction[T] { +trait WindowFunction { - def apply(context: WindowFunction.Context[T]): Array[Window] + def apply[T](context: WindowFunction.Context[T]): Array[Window] def isNonMerging: Boolean } -abstract class NonMergingWindowFunction[T] extends WindowFunction[T] { +abstract class NonMergingWindowFunction extends WindowFunction { override def isNonMerging: Boolean = true } -case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] { +object GlobalWindowFunction { - override def apply(context: WindowFunction.Context[T]): Array[Window] = { - Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), - Instant.ofEpochMilli(MAX_TIME_MILLIS))) - } + val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), + Instant.ofEpochMilli(MAX_TIME_MILLIS))) +} +case class GlobalWindowFunction() extends NonMergingWindowFunction { + + override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { + GlobalWindowFunction.globalWindow + } } -case class SlidingWindowFunction[T](size: Duration, step: Duration) - extends NonMergingWindowFunction[T] { +case class SlidingWindowFunction(size: Duration, step: Duration) + extends NonMergingWindowFunction { def this(size: Duration) = { this(size, size) } - override def apply(context: WindowFunction.Context[T]): Array[Window] = { + override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { val timestamp = context.timestamp val sizeMillis = size.toMillis val stepMillis = step.toMillis @@ -81,16 +85,9 @@ case class SlidingWindowFunction[T](size: Duration, step: Duration) } } -case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] { - - override def apply(context: WindowFunction.Context[T]): Array[Window] = { - Array(Window.ofEpochMilli(0, size)) - } -} - -case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] { +case class SessionWindowFunction(gap: Duration) extends WindowFunction { - override def apply(context: WindowFunction.Context[T]): Array[Window] = { + override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { Array(Window(context.timestamp, context.timestamp.plus(gap))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala index 467f57c..d53bc96 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -20,42 +20,35 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.Duration /** - * * Defines how to apply window functions. * * @param windowFn how to divide windows * @param trigger when to trigger window result * @param accumulationMode whether to accumulate results across windows */ -case class Windows[T]( - windowFn: WindowFunction[T], +case class Windows( + windowFn: WindowFunction, trigger: Trigger = EventTimeTrigger, - accumulationMode: AccumulationMode = Discarding) { - - def triggering(trigger: Trigger): Windows[T] = { - Windows(windowFn, trigger) - } + accumulationMode: AccumulationMode = Discarding, + description: String) { - def accumulating: Windows[T] = { - Windows(windowFn, trigger, Accumulating) + def triggering(trigger: Trigger): Windows = { + Windows(windowFn, trigger, accumulationMode, description) } - def discarding: Windows[T] = { - Windows(windowFn, trigger, Discarding) + def accumulating: Windows = { + Windows(windowFn, trigger, Accumulating, description) } -} - -object CountWindows { - def apply[T](size: Int): Windows[T] = { - Windows(CountWindowFunction(size), CountTrigger) + def discarding: Windows = { + Windows(windowFn, trigger, Discarding, description) } } object GlobalWindows { - def apply[T](): Windows[T] = { - Windows(GlobalWindowFunction()) + def apply(): Windows = { + Windows(GlobalWindowFunction(), description = "globalWindows") } } @@ -67,8 +60,8 @@ object FixedWindows { * @param size window size * @return a Window definition */ - def apply[T](size: Duration): Windows[T] = { - Windows(SlidingWindowFunction(size, size)) + def apply(size: Duration): Windows = { + Windows(SlidingWindowFunction(size, size), description = "fixedWindows") } } @@ -81,8 +74,8 @@ object SlidingWindows { * @param step window step to slide forward * @return a Window definition */ - def apply[T](size: Duration, step: Duration): Windows[T] = { - Windows(SlidingWindowFunction(size, step)) + def apply(size: Duration, step: Duration): Windows = { + Windows(SlidingWindowFunction(size, step), description = "slidingWindows") } } @@ -94,8 +87,8 @@ object SessionWindows { * @param gap session gap * @return a Window definition */ - def apply[T](gap: Duration): Windows[T] = { - Windows(SessionWindowFunction(gap)) + def apply(gap: Duration): Windows = { + Windows(SessionWindowFunction(gap), description = "sessionWindows") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 05ce74e..870c334 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor import org.apache.gearpump.{Message, TimeStamp} import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask} import org.apache.gearpump.streaming.task.Task object Window { @@ -65,31 +64,4 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo } } -case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) { - - def groupBy(message: Message): (GROUP, List[Window]) = { - val ele = message.value.asInstanceOf[T] - val group = groupByFn(ele) - val windows = window.windowFn(new WindowFunction.Context[T] { - override def element: T = ele - override def timestamp: Instant = message.timestamp - }) - group -> windows.toList - } - - def getProcessor(parallelism: Int, description: String, - userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { - val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this) - window.trigger match { - case CountTrigger => - Processor[CountTriggerTask[T, GROUP]](parallelism, description, config) - case ProcessingTimeTrigger => - Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config) - case EventTimeTrigger => - Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config) - } - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index f392f70..2025618 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -19,133 +19,124 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.Instant -import akka.actor.ActorSystem import com.gs.collections.api.block.predicate.Predicate -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import com.gs.collections.api.block.procedure.{Procedure, Procedure2} +import com.gs.collections.api.block.procedure.Procedure import com.gs.collections.impl.list.mutable.FastList -import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap -import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.window.api.Discarding -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.gearpump.util.LogUtil -import org.slf4j.Logger +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} +import scala.collection.mutable.ArrayBuffer -trait WindowRunner { +case class TimestampedValue[T](value: T, timestamp: Instant) - def process(message: Message): Unit +trait WindowRunner[IN, OUT] extends java.io.Serializable { - def trigger(time: Instant): Unit + def process(timestampedValue: TimestampedValue[IN]): Unit + + def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] } -object DefaultWindowRunner { +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], + right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { + + def process(timestampedValue: TimestampedValue[IN]): Unit = { + left.process(timestampedValue) + } - private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) + def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = { + left.trigger(time).foreach(right.process) + right.trigger(time) + } } -class DefaultWindowRunner[IN, GROUP, OUT]( - taskContext: TaskContext, userConfig: UserConfig, - groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) - extends WindowRunner { - - private val windowFn = groupBy.window.windowFn - private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]] - private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] - private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean] - - override def process(message: Message): Unit = { - val input = message.value.asInstanceOf[IN] - val (group, windows) = groupBy.groupBy(message) - if (!groupedWindowInputs.containsKey(group)) { - groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]()) - } - val windowInputs = groupedWindowInputs.get(group) - windows.foreach { win => +class DefaultWindowRunner[IN, OUT]( + windows: Windows, + fnRunner: FunctionRunner[IN, OUT]) + extends WindowRunner[IN, OUT] { + + private val windowFn = windows.windowFn + private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] + private var setup = false + + override def process(timestampedValue: TimestampedValue[IN]): Unit = { + val wins = windowFn(new Context[IN] { + override def element: IN = timestampedValue.value + + override def timestamp: Instant = timestampedValue.timestamp + }) + wins.foreach { win => if (windowFn.isNonMerging) { if (!windowInputs.containsKey(win)) { - val inputs = new FastList[IN](1) + val inputs = new FastList[TimestampedValue[IN]] windowInputs.put(win, inputs) } - windowInputs.get(win).add(input) + windowInputs.get(win).add(timestampedValue) } else { - merge(windowInputs, win, input) + merge(windowInputs, win, timestampedValue) } } - if (!groupedFnRunners.containsKey(group)) { - val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groupedFnRunners.put(group, runner) - groupedRunnerSetups.put(group, false) - } - - def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = { - val intersected = windowInputs.keySet.select(new Predicate[Window] { + def merge( + winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]], + win: Window, tv: TimestampedValue[IN]): Unit = { + val intersected = winIns.keySet.select(new Predicate[Window] { override def accept(each: Window): Boolean = { win.intersects(each) } }) var mergedWin = win - val mergedInputs = FastList.newListWith(input) + val mergedInputs = FastList.newListWith(tv) intersected.forEach(new Procedure[Window] { override def value(each: Window): Unit = { mergedWin = mergedWin.span(each) - mergedInputs.addAll(windowInputs.remove(each)) + mergedInputs.addAll(winIns.remove(each)) } }) - windowInputs.put(mergedWin, mergedInputs) + winIns.put(mergedWin, mergedInputs) } - } - override def trigger(time: Instant): Unit = { - groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] { - override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { - onTrigger(group, windowInputs) - } - }) - + override def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = { @annotation.tailrec - def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { + def onTrigger( + outputs: ArrayBuffer[TimestampedValue[OUT]]): TraversableOnce[TimestampedValue[OUT]] = { if (windowInputs.notEmpty()) { val firstWin = windowInputs.firstKey if (!time.isBefore(firstWin.endTime)) { val inputs = windowInputs.remove(firstWin) - if (groupedFnRunners.containsKey(group)) { - val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group), - (output: OUT) => { - taskContext.output(Message(output, time)) - }) - val setup = groupedRunnerSetups.get(group) - if (!setup) { - runner.setup() - groupedRunnerSetups.put(group, true) - } - inputs.forEach(new Procedure[IN] { - override def value(t: IN): Unit = { - // .toList forces eager evaluation - runner.process(t).toList + if (!setup) { + fnRunner.setup() + setup = true + } + inputs.forEach(new Procedure[TimestampedValue[IN]] { + override def value(tv: TimestampedValue[IN]): Unit = { + fnRunner.process(tv.value).foreach { + out: OUT => outputs += TimestampedValue(out, tv.timestamp) } - }) - // .toList forces eager evaluation - runner.finish().toList - if (groupBy.window.accumulationMode == Discarding) { - runner.teardown() - groupedRunnerSetups.put(group, false) - // dicarding, setup need to be called for each window - onTrigger(group, windowInputs) - } else { - // accumulating, setup is only called for the first window - onTrigger(group, windowInputs) } + }) + fnRunner.finish().foreach { + out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) + } + if (windows.accumulationMode == Discarding) { + fnRunner.teardown() + setup = false + // discarding, setup need to be called for each window + onTrigger(outputs) } else { - throw new RuntimeException(s"FunctionRunner not found for group $group") + // accumulating, setup is only called for the first window + onTrigger(outputs) } + } else { + outputs } + } else { + outputs } } + + onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]]) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala new file mode 100644 index 0000000..c2ddb0d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** + * Partition messages by applying group by function first. + * + * For example: + * {{{ + * case class People(name: String, gender: String) + * + * object Test{ + * + * val groupBy: (People => String) = people => people.gender + * val partitioner = GroupByPartitioner(groupBy) + * } + * }}} + * + * @param fn First apply message with groupBy function, then pick the hashCode of the output + * to do the partitioning. You must define hashCode() for output type of groupBy function. + */ +class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner { + + override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { + val hashCode = fn(message.value.asInstanceOf[T]).hashCode() + (hashCode & Integer.MAX_VALUE) % partitionNum + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index ff1b2d4..74b0cc2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,8 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -40,17 +39,17 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext} * - `DataSource.close()` in `onStop` */ class DataSourceTask[IN, OUT] private[source]( - context: TaskContext, - conf: UserConfig, source: DataSource, - transform: Transform[IN, OUT]) + windowRunner: WindowRunner[IN, OUT], + context: TaskContext, + conf: UserConfig) extends Task(context, conf) { def this(context: TaskContext, conf: UserConfig) = { - this(context, conf, + this( conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - new Transform[IN, OUT](context, - conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)) + conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + context, conf ) } @@ -65,14 +64,19 @@ class DataSourceTask[IN, OUT] private[source]( override def onNext(m: Message): Unit = { 0.until(batchSize).foreach { _ => - Option(source.read()).foreach(transform.onNext) + Option(source.read()).foreach( + msg => windowRunner.process( + TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))) } self ! Watermark(source.getWatermark) } override def onWatermarkProgress(watermark: Instant): Unit = { - transform.onWatermarkProgress(watermark) + windowRunner.trigger(watermark).foreach { + result => + context.output(Message(result.value, result.timestamp)) + } } override def onStop(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 0ec2b6f..14abff8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -30,7 +30,7 @@ case class Watermark(instant: Instant) { object Watermark { - val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS) + val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1) val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 79bcc2a..44ec2c6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -57,8 +57,8 @@ class Subscription( private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism) - private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS) - private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS) + private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) + private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) private var maxPendingCount: Short = 0 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala deleted file mode 100644 index 1934d14..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.partitioner - -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People - -class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - it should "group by message payload and window" in { - val mark = People("Mark", "male") - val tom = People("Tom", "male") - val michelle = People("Michelle", "female") - - val partitionNum = 10 - - val groupBy = new GroupByPartitioner[People, String](_.gender) - groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe - groupBy.getPartition(Message(tom, 2L), partitionNum) - - groupBy.getPartition(Message(mark, 2L), partitionNum) should not be - groupBy.getPartition(Message(michelle, 3L), partitionNum) - } -} - -object GroupByPartitionerSpec { - case class People(name: String, gender: String) -}
