[GEARPUMP-311] refactor state management
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/b9f10866 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/b9f10866 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/b9f10866 Branch: refs/heads/state Commit: b9f108667a035a791ea016853cefc952922927ab Parents: fe41030 Author: vinoyang <[email protected]> Authored: Sat Jul 22 18:21:23 2017 +0800 Committer: vinoyang <[email protected]> Committed: Sat Jul 22 18:21:23 2017 +0800 ---------------------------------------------------------------------- .../refactor/dsl/javaapi/JavaStream.scala | 98 +++++ .../streaming/refactor/dsl/plan/OP.scala | 379 +++++++++++++++++++ .../refactor/dsl/scalaapi/Stream.scala | 307 +++++++++++++++ 3 files changed, 784 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b9f10866/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala new file mode 100644 index 0000000..0cb1185 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.javaapi + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction} +import org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.{FlatMapWithStateFunction => JFlatMapWithStateFunction} +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.Stream +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.window.api.Windows +import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction +import org.apache.gearpump.streaming.task.Task + +/** + * Java DSL + */ +class JavaStream[T](val stream: Stream[T]) { + + /** FlatMap on stream */ + def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap")) + } + + def flatMapWithState[R](fn: JFlatMapWithStateFunction[T, R], + description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), "flatMapWithState")) + } + + /** Map on stream */ + def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description)) + } + + def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), description)) + } + + /** Only keep the messages that FilterFunction returns true. */ + def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description)) + } + + def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = { + new JavaStream[A](stream.fold(fn, description)) + } + + /** Does aggregation on the stream */ + def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.reduce(fn, description)) + } + + def log(): Unit = { + stream.log() + } + + /** Merges streams of same type together */ + def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.merge(other.stream, parallelism, description)) + } + + /** + * Group by a stream and turns it to a list of sub-streams. Operations chained after + * groupBy applies to sub-streams. + */ + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], + parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description)) + } + + def window(win: Windows): JavaStream[T] = { + new JavaStream[T](stream.window(win)) + } + + /** Add a low level Processor to process messages */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) + : JavaStream[R] = { + new JavaStream[R](stream.process(processor, parallelism, conf, description)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b9f10866/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala new file mode 100644 index 0000000..9744ec6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.plan.functions + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{AndThen => WindowRunnerAT} +import org.apache.gearpump.streaming.{Constants, Processor} +import org.apache.gearpump.streaming.refactor.dsl.task.{GroupByTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows} +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.refactor.source.DataSourceTask +import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task + +import scala.reflect.ClassTag + +object Op { + + def concatenate(desc1: String, desc2: String): String = { + if (desc1 == null || desc1.isEmpty) desc2 + else if (desc2 == null || desc2.isEmpty) desc1 + else desc1 + "." + desc2 + } + + def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = { + config1.withConfig(config2) + } + + def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig, + processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = { + if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { + op.chain( + WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any])) + ).toProcessor + } else { + processor + } + } + +} + +/** + * This is a vertex on the logical plan. + */ +sealed trait Op { + + def description: String + + def userConfig: UserConfig + + def chain(op: Op)(implicit system: ActorSystem): Op + + def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] +} + +/** + * This represents a low level Processor. + */ +case class ProcessorOp[T <: Task]( + processor: Class[T], + parallelism: Int, + userConfig: UserConfig, + description: String) + extends Op { + + def this( + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "processor")(implicit classTag: ClassTag[T]) = { + this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description) + } + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, other) + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DefaultProcessor(parallelism, description, userConfig, processor) + } +} + +/** + * This represents a DataSource. + */ +case class DataSourceOp( + dataSource: DataSource, + parallelism: Int = 1, + description: String = "source", + userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[_, _] => + DataSourceOp( + dataSource, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + op.windowRunner), + op.userConfig)) + case op: TransformOp[_, _] => + chain( + WindowOp(GlobalWindows()).chain(op)) + case op: WindowOp => + chain( + op.chain(TransformOp(new DummyRunner[Any]()))) + case op: TransformWindowTransformOp[_, _, _] => + chain( + WindowOp(GlobalWindows()).chain(op.transformOp) + .chain(op.windowTransformOp)) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[DataSourceTask[Any, Any]](parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) + ) + } +} + +/** + * This represents a DataSink. + */ +case class DataSinkOp( + dataSink: DataSink, + parallelism: Int = 1, + description: String = "sink", + userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, op) + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DataSinkProcessor(dataSink, parallelism, description) + } +} + +/** + * This represents operations that can be chained together + * (e.g. flatMap, map, filter, reduce) and further chained + * to another Op to be used + */ +case class TransformOp[IN, OUT]( + fn: FunctionRunner[IN, OUT], + userConfig: UserConfig = UserConfig.empty) extends Op { + + override def description: String = fn.description + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: TransformOp[OUT, _] => + // TODO: preserve type info + // f3(f2(f1(in))) + // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3)) + // => AndThen(AndThen(f1, f2), f3) + TransformOp( + AndThen(fn, op.fn), + Op.concatenate(userConfig, op.userConfig)) + case op: WindowOp => + TransformWindowTransformOp(this, + WindowTransformOp(new DefaultWindowRunner[OUT, OUT]( + op.windows, new DummyRunner[OUT] + ), op.description, op.userConfig)) + case op: TransformWindowTransformOp[OUT, _, _] => + TransformWindowTransformOp(TransformOp( + AndThen(fn, op.transformOp.fn), + Op.concatenate(userConfig, op.transformOp.userConfig) + ), op.windowTransformOp) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor + } +} + +/** + * This is an intermediate operation, produced by chaining WindowOp and TransformOp. + * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp. + * Otherwise, it will be translated to a Processor of TransformTask. + */ +case class WindowTransformOp[IN, OUT]( + windowRunner: WindowRunner[IN, OUT], + description: String, + userConfig: UserConfig) extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[OUT, _] => + WindowTransformOp( + WindowRunnerAT(windowRunner, op.windowRunner), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig) + ) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp + Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + } +} + +/** + * This is an intermediate operation, produced by chaining TransformOp and WindowOp. + * It will later be chained to a WindowOp, which results in two WindowTransformOps. + * Finally, they will be chained to a single WindowTransformOp. + */ +case class TransformWindowTransformOp[IN, MIDDLE, OUT]( + transformOp: TransformOp[IN, MIDDLE], + windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { + + override def description: String = { + throw new UnsupportedOperationException(s"description is not supported on $this") + } + + override def userConfig: UserConfig = { + throw new UnsupportedOperationException(s"userConfig is not supported on $this") + } + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new UnsupportedOperationException(s"chain is not supported on $this") + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor + } +} + +/** + * This represents a window aggregation, together with a following TransformOp + */ +case class WindowOp( + windows: Windows, + userConfig: UserConfig = UserConfig.empty) extends Op { + + override def description: String = windows.description + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: TransformOp[_, _] => + WindowTransformOp(new DefaultWindowRunner(windows, op.fn), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig)) + case op: WindowOp => + chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any]))) + case op: TransformWindowTransformOp[_, _, _] => + WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn), + Op.concatenate(description, op.transformOp.description), + Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + chain(TransformOp(new DummyRunner[Any])).toProcessor + } + +} + +/** + * This represents a Processor with groupBy and window aggregation + */ +case class GroupByOp[IN, GROUP] private( + groupBy: IN => GROUP, + parallelism: Int = 1, + description: String = "groupBy", + override val userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[_, _] => + GroupByOp( + groupBy, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate( + userConfig + .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner), + userConfig)) + case op: WindowOp => + chain(op.chain(TransformOp(new DummyRunner[Any]()))) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[GroupByTask[IN, GROUP, Any]](parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy))) + } +} + +/** + * This represents a Processor transforming merged streams + */ +case class MergeOp( + parallelism: Int = 1, + description: String = "merge", + userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[_, _] => + MergeOp( + parallelism, + description, + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + op.windowRunner), + op.userConfig)) + case op: WindowOp => + chain(op.chain(TransformOp(new DummyRunner[Any]()))) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Op.withGlobalWindowsDummyRunner(this, userConfig, + Processor[TransformTask[Any, Any]](parallelism, description, userConfig)) + } + +} + +/** + * This is an edge on the logical plan. + */ +trait OpEdge + +/** + * The upstream OP and downstream OP doesn't require network data shuffle. + * e.g. TransformOp + */ +case object Direct extends OpEdge + +/** + * The upstream OP and downstream OP DOES require network data shuffle. + * e.g. GroupByOp + */ +case object Shuffle extends OpEdge + +/** + * Runtime exception thrown on chaining. + */ +class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b9f10866/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala new file mode 100644 index 0000000..c0499c9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.scalaapi + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.refactor.dsl.plan.functions._ +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.slf4j.{Logger, LoggerFactory} + +import scala.language.implicitConversions + +class Stream[T]( + private val graph: Graph[Op, OpEdge], private val thisNode: Op, + private val edge: Option[OpEdge] = None, + private val windows: Windows = GlobalWindows()) { + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = { + transform(new FlatMapper[T, R](fn, description)) + } + + def flatMapWithState[R](fn: FlatMapWithStateFunction[T, R], description: String): Stream[R] = { + transform(new FlatMapper[T, R](fn, description)) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: T => R, description: String = "map"): Stream[R] = { + this.map(MapFunction(fn), description) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: MapFunction[T, R], description: String): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): Stream[R] = { + this.flatMapWithState(FlatMapWithStateFunction(fn), description); + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { + this.filter(FilterFunction(fn), description) + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: FilterFunction[T], description: String): Stream[T] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new stream by applying a fold function over all the elements + * + * @param fn fold function + * @return a new stream after fold + */ + def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = { + transform(new FoldRunner(fn, description)) + } + + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + reduce(ReduceFunction(fn), description) + } + + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: ReduceFunction[T], description: String): Stream[T] = { + fold(fn, description).map(_.get) + } + + private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = { + val op = TransformOp(fn) + graph.addVertex(op) + graph.addEdge(thisNode, edge.getOrElse(Direct), op) + new Stream(graph, op, None, windows) + } + + /** + * Log to task log file + */ + def log(): Unit = { + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") + } + + /** + * Merges data from two stream into one + * + * @param other the other stream + * @return the merged stream + */ + def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(parallelism, description, UserConfig.empty) + graph.addVertex(mergeOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) + graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) + val winOp = Stream.addWindowOp(graph, mergeOp, windows) + new Stream[T](graph, winOp, None, windows) + } + + /** + * Group by function (T => Group) + * + * For example, we have T type, People(name: String, gender: String, age: Int) + * groupBy[People](_.gender) will group the people by gender. + * + * You can append other combinators after groupBy + * + * For example, + * {{{ + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) + * }}} + * + * @param fn Group by function + * @param parallelism Parallelism level + * @param description The description + * @return the grouped stream + */ + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val gbOp = GroupByOp(fn, parallelism, description) + graph.addVertex(gbOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp) + val winOp = Stream.addWindowOp(graph, gbOp, windows) + new Stream(graph, winOp, None, windows) + } + + /** + * Window function + * + * @param windows window definition + * @return the windowed [[Stream]] + */ + def window(windows: Windows): Stream[T] = { + val winOp = Stream.addWindowOp(graph, thisNode, windows) + new Stream(graph, winOp, None, windows) + } + + /** + * Connects with a low level Processor(TaskDescription) + * + * @param processor a user defined processor + * @param parallelism parallelism level + * @return new stream after processing with type [R] + */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) + graph.addVertex(processorOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) + new Stream[R](graph, processorOp, Some(Shuffle), windows) + } + + +} + +class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { + /** + * GroupBy key + * + * Applies to Stream[Tuple2[K,V]] + * + * @param parallelism the parallelism for this operation + * @return the new KV stream + */ + def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { + stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") + } + + /** + * Sum the value of the tuples + * + * Apply to Stream[Tuple2[K,V]], V must be of type Number + * + * For input (key, value1), (key, value2), will generate (key, value1 + value2) + * @param numeric the numeric operations + * @return the sum stream + */ + def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { + stream.reduce(Stream.sumByKey[K, V](numeric), "sum") + } +} + +object Stream { + + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge], + windows: Windows): Stream[T] = { + new Stream[T](graph, node, edge, windows) + } + + def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] + = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + + def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = { + val winOp = WindowOp(win) + graph.addVertex(winOp) + graph.addEdge(op, Direct, winOp) + winOp + } + + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { + new KVStream(stream) + } + + implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, description, conf) + stream.graph.addVertex(sink) + stream.graph.addEdge(stream.thisNode, Shuffle, sink) + new Stream[T](stream.graph, sink, None, stream.windows) + } + } +} + +class LoggerSink[T] extends DataSink { + var logger: Logger = _ + + override def open(context: TaskContext): Unit = { + this.logger = context.logger + } + + override def write(message: Message): Unit = { + logger.info("logging message " + message.value) + } + + override def close(): Unit = Unit +} \ No newline at end of file
