Repository: incubator-gearpump Updated Branches: refs/heads/master f96aca995 -> e1228a314
[GEARPUMP-339] Add ScalaDoc to Streaming DSL Author: manuzhang <[email protected]> Closes #212 from manuzhang/plan_doc. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e1228a31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e1228a31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e1228a31 Branch: refs/heads/master Commit: e1228a31445418e6aca77ad3cf4c2c18fb6748ee Parents: f96aca9 Author: manuzhang <[email protected]> Authored: Mon Aug 7 06:02:29 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Aug 7 06:02:47 2017 +0800 ---------------------------------------------------------------------- .../topology/GearpumpStormComponentSpec.scala | 5 +- .../storm/util/StormOutputCollectorSpec.scala | 9 +- .../dsl/api/functions/FilterFunction.scala | 2 - .../dsl/api/functions/FoldFunction.scala | 2 - .../dsl/api/functions/MapFunction.scala | 2 - .../api/functions/SerializableFunction.scala | 32 ++++ .../dsl/javaapi/functions/FlatMapFunction.scala | 2 +- .../apache/gearpump/streaming/dsl/package.scala | 48 ++++++ .../apache/gearpump/streaming/dsl/plan/OP.scala | 171 ++++++++++++------- .../gearpump/streaming/dsl/plan/Planner.scala | 20 ++- .../streaming/dsl/scalaapi/Stream.scala | 1 + .../streaming/dsl/scalaapi/StreamApp.scala | 25 ++- .../scalaapi/functions/FlatMapFunction.scala | 2 +- .../functions/SerializableFunction.scala | 32 ---- 14 files changed, 229 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala index 0891070..50204ca 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -24,12 +24,13 @@ import akka.actor.ActorRef import backtype.storm.spout.{ISpout, SpoutOutputCollector} import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} import backtype.storm.tuple.Tuple +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} import org.apache.gearpump.experiments.storm.util.StormOutputCollector import org.apache.gearpump.streaming.task.{TaskContext, TaskId} import org.apache.gearpump.streaming.{DAG, MockUtil} -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message import org.mockito.Matchers.{anyObject, eq => mockitoEq} import org.mockito.Mockito._ import org.scalacheck.Gen @@ -75,7 +76,7 @@ class GearpumpStormComponentSpec property("GearpumpBolt lifecycle") { val timestampGen = Gen.chooseNum[Long](0L, 1000L) val freqGen = Gen.chooseNum[Int](1, 100) - forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) => + forAll(timestampGen, freqGen) { (timestamp: MilliSeconds, freq: Int) => val config = mock[JMap[AnyRef, AnyRef]] val bolt = mock[IBolt] val taskContext = MockUtil.mockTaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index 05627c9..7fab2cc 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -27,7 +27,8 @@ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.streaming.MockUtil @@ -41,7 +42,7 @@ class StormOutputCollectorSpec property("StormOutputCollector emits tuple values into a stream") { forAll(timestampGen, streamIdGen, valuesGen) { - (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => + (timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) => val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => @@ -52,7 +53,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) + targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST @@ -85,7 +86,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) + targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) verify(taskContext, times(0)).output(anyObject[Message]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala index 25a0929..8d3ffb3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -17,8 +17,6 @@ */ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - object FilterFunction { def apply[T](fn: T => Boolean): FilterFunction[T] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala index 9ff44a8..1525d6e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - /** * Combines input into an accumulator. * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala index a4fdca6..7880c2f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -17,8 +17,6 @@ */ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - object MapFunction { def apply[T, R](fn: T => R): MapFunction[T, R] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala new file mode 100644 index 0000000..b90ba28 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +/** + * Superclass for all user defined function interfaces. + * This ensures all functions are serializable and provides common methods + * like setup and teardown. Users should not extend this class directly + * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]]. + */ +abstract class SerializableFunction extends java.io.Serializable { + + def setup(): Unit = {} + + def teardown(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala index 11e2416..adad878 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.dsl.javaapi.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction +import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction /** * Transforms one input into zero or more outputs of possibly different types. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala new file mode 100644 index 0000000..6d43f16 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + + +// scalastyle:off line.size.limit +/** + * + * The architecture of Gearpump Streaming DSL consists of several layers: + * + * * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]] + * from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses + * of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]] + * in the underlying [[org.apache.gearpump.util.Graph]]. + * * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of + * [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]]. + * * The execution layer is usually composed of the following four tasks. + * + * * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump + * * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out. + * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]] + * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops. + * + * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally + * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with + * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually + * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]]. + * + */ +// scalastyle:on line.size.limit +package object dsl { + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 2a45a8f..c37ced6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -35,16 +35,29 @@ import scala.reflect.ClassTag object Op { + /** + * Concatenates two descriptions with "." or returns one if the other is empty. + */ def concatenate(desc1: String, desc2: String): String = { if (desc1 == null || desc1.isEmpty) desc2 else if (desc2 == null || desc2.isEmpty) desc1 else desc1 + "." + desc2 } + /** + * Concatenates two configs according to the following rules + * 1. The first config cannot be null. + * 2. The first config is returned if the second config is null + * 3. The second config takes precedence for overlapping config keys + */ def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = { config1.withConfig(config2) } + /** + * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in + * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF. + */ def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig, processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = { if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { @@ -59,22 +72,37 @@ object Op { } /** - * This is a vertex on the logical plan. + * This is a vertex on the logical Graph, representing user defined functions in + * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. */ sealed trait Op { + /** + * This comes from user function description and is used to display it on front end. + */ def description: String + /** + * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed. + */ def userConfig: UserConfig + /** + * This creates a new Op by merging their user functions, user configs and descriptions. + */ def chain(op: Op)(implicit system: ActorSystem): Op + /** + * This creates a Processor after chaining. + */ def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] } /** - * This represents a low level Processor. + * This represents a low level Processor. It is deprecated since it + * doesn't work with other Ops. */ +@deprecated case class ProcessorOp[T <: Task]( processor: Class[T], parallelism: Int, @@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task]( } /** - * This represents a DataSource. + * This represents a DataSource and creates a + * [[org.apache.gearpump.streaming.source.DataSourceTask]] */ case class DataSourceOp( dataSource: DataSource, @@ -142,7 +171,7 @@ case class DataSourceOp( } /** - * This represents a DataSink. + * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]]. */ case class DataSinkOp( dataSink: DataSink, @@ -163,7 +192,7 @@ case class DataSinkOp( /** * This represents operations that can be chained together * (e.g. flatMap, map, filter, reduce) and further chained - * to another Op to be used + * to another Op to be executed */ case class TransformOp[IN, OUT]( fn: FunctionRunner[IN, OUT], @@ -201,61 +230,7 @@ case class TransformOp[IN, OUT]( } } -/** - * This is an intermediate operation, produced by chaining WindowOp and TransformOp. - * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp. - * Otherwise, it will be translated to a Processor of TransformTask. - */ -case class WindowTransformOp[IN, OUT]( - windowRunner: WindowRunner[IN, OUT], - description: String, - userConfig: UserConfig) extends Op { - - override def chain(other: Op)(implicit system: ActorSystem): Op = { - other match { - case op: WindowTransformOp[OUT, _] => - WindowTransformOp( - WindowRunnerAT(windowRunner, op.windowRunner), - Op.concatenate(description, op.description), - Op.concatenate(userConfig, op.userConfig) - ) - case _ => - throw new OpChainException(this, other) - } - } - - override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp - Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( - Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) - } -} - -/** - * This is an intermediate operation, produced by chaining TransformOp and WindowOp. - * It will later be chained to a WindowOp, which results in two WindowTransformOps. - * Finally, they will be chained to a single WindowTransformOp. - */ -case class TransformWindowTransformOp[IN, MIDDLE, OUT]( - transformOp: TransformOp[IN, MIDDLE], - windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { - - override def description: String = { - throw new UnsupportedOperationException(s"description is not supported on $this") - } - - override def userConfig: UserConfig = { - throw new UnsupportedOperationException(s"userConfig is not supported on $this") - } - - override def chain(op: Op)(implicit system: ActorSystem): Op = { - throw new UnsupportedOperationException(s"chain is not supported on $this") - } - override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - WindowOp(GlobalWindows()).chain(this).toProcessor - } -} /** * This represents a window aggregation, together with a following TransformOp @@ -290,7 +265,14 @@ case class WindowOp( } /** - * This represents a Processor with groupBy and window aggregation + * This represents an operation with groupBy followed by window aggregation. + * + * It can only be chained with [[WindowTransformOp]] to be executed in + * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]]. + * However, it's possible a window function has no following aggregations. In that case, + * we manually tail a [[WindowOp]] with [[TransformOp]] of + * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a + * [[WindowTransformOp]]. */ case class GroupByOp[IN, GROUP] private( groupBy: IN => GROUP, @@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private( } /** - * This represents a Processor transforming merged streams + * This represents an operation with merge followed by window aggregation. + * + * It can only be chained with [[WindowTransformOp]] to be executed in + * [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. + * However, it's possible a merge function has no following aggregations. In that case, + * we manually tail a [[WindowOp]] with [[TransformOp]] of + * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a + * [[WindowTransformOp]]. */ case class MergeOp( parallelism: Int = 1, @@ -357,7 +346,65 @@ case class MergeOp( } /** - * This is an edge on the logical plan. + * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]]. + * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless, + * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case, + * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. + */ +private case class WindowTransformOp[IN, OUT]( + windowRunner: WindowRunner[IN, OUT], + description: String, + userConfig: UserConfig) extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[OUT, _] => + WindowTransformOp( + WindowRunnerAT(windowRunner, op.windowRunner), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig) + ) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp + Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + } +} + +/** + * This is an intermediate operation, produced by chaining [[TransformOp]] and + * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in + * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp. + */ +private case class TransformWindowTransformOp[IN, MIDDLE, OUT]( + transformOp: TransformOp[IN, MIDDLE], + windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { + + override def description: String = { + throw new UnsupportedOperationException(s"description is not supported on $this") + } + + override def userConfig: UserConfig = { + throw new UnsupportedOperationException(s"userConfig is not supported on $this") + } + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new UnsupportedOperationException(s"chain is not supported on $this") + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor + } +} + +/** + * This is an edge on the logical plan. It defines whether data should be transported locally + * or shuffled remotely between [[Op]]. */ trait OpEdge http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index b1b39c9..04b5337 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph +/** + * This class is responsible for turning the high level + * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level + * [[org.apache.gearpump.streaming.Processor]] API. + */ class Planner { /** - * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low - * level Graph API. + * This method interprets a Graph of [[Op]] and creates a Graph of + * [[org.apache.gearpump.streaming.Processor]]. + * + * It firstly reversely traverses the Graph from a leaf Op and merges it with + * its downstream Op according to the following rules. + * + * 1. The Op has only one outgoing edge and the downstream Op has only one incoming edge + * 2. Neither Op is [[ProcessorOp]] + * 3. The edge is [[Direct]] + * + * Finally the vertices of the optimized Graph are translated to Processors + * and the edges to Partitioners. */ def plan(dag: Graph[Op, OpEdge]) (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { @@ -43,6 +58,7 @@ class Planner { case _ => new HashPartitioner } case Direct => + // FIXME: This is never used new CoLocationPartitioner } }.mapVertex(_.toProcessor) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index ef2753e..d0c733e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -210,6 +210,7 @@ class Stream[T]( * @param parallelism parallelism level * @return new stream after processing with type [R] */ + @deprecated def process[R]( processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, description: String = "process"): Stream[R] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index bce8c0c..17d77bc 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -62,6 +62,17 @@ class StreamApp( val dag = planner.plan(graph) StreamApplication(name, dag, userConfig) } + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { + val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) + graph.addVertex(sourceOp) + new Stream[T](graph, sourceOp) + } + + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) + } } object StreamApp { @@ -73,20 +84,6 @@ object StreamApp { implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { streamApp.plan() } - - implicit class Source(app: StreamApp) extends java.io.Serializable { - - def source[T](dataSource: DataSource, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - - def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) - } - } } /** A test message source which generated message sequence repeatedly. */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala index 252b5bd..2d26df6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.dsl.scalaapi.functions -import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction} +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction} import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction} import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala deleted file mode 100644 index ab88bf1..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.scalaapi.functions - -/** - * Superclass for all user defined function interfaces. - * This ensures all functions are serializable and provides common methods - * like setup and teardown. Users should not extend this class directly - * but subclasses like [[FlatMapFunction]]. - */ -abstract class SerializableFunction extends java.io.Serializable { - - def setup(): Unit = {} - - def teardown(): Unit = {} - -}
