http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 3003b98..7f3c250 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gearpump.streaming.dsl.javaapi -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} import org.apache.gearpump.streaming.dsl.window.api.Window -import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} -import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task /** @@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task class JavaStream[T](val stream: Stream[T]) { /** FlatMap on stream */ - def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) + def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap")) } /** Map on stream */ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.map({ t: T => fn(t) }, description)) + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description)) } /** Only keep the messages that FilterFunction returns true. */ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) + new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description)) } /** Does aggregation on the stream */ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) + new JavaStream[T](stream.reduce(fn, description)) } def log(): Unit = { @@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) { */ def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) } def window(win: Window, description: String): JavaWindowStream[T] = { @@ -84,6 +84,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) { def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) } }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala index 0d841be..f5b2910 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -19,13 +19,14 @@ package org.apache.gearpump.streaming.dsl.javaapi import java.util.Collection -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} +import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication} +import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp} import org.apache.gearpump.streaming.source.DataSource +import scala.collection.JavaConverters._ + class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { private val streamApp = StreamApp(name, context, userConfig) @@ -41,7 +42,7 @@ class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) } - def run(): Unit = { + def submit(): RunningApplication = { context.submit(streamApp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala new file mode 100644 index 0000000..85d597d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.javaapi.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Java version of FlatMapFunction returns a java.util.Iterator. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): java.util.Iterator[R] +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala new file mode 100644 index 0000000..7656cba --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.javaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction + +/** + * Assigns the input value into a group. + * + * @param T Input value type + * @param GROUP Group value type + */ +abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala index 2ec881b..efa7409 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.dsl.partitioner import org.apache.gearpump.Message -import org.apache.gearpump.partitioner.UnicastPartitioner +import org.apache.gearpump.streaming.partitioner.UnicastPartitioner import org.apache.gearpump.streaming.dsl.window.api.GroupByFn /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index b2c5506..5aaf2fa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -22,11 +22,10 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask -import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn} -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task @@ -130,12 +129,11 @@ case class ChainableOp[IN, OUT]( override def description: String = fn.description - override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: ChainableOp[OUT, _] => // TODO: preserve type info - ChainableOp(fn.andThen(op.fn)) + ChainableOp(AndThen(fn, op.fn)) case _ => throw new OpChainException(this, other) } @@ -147,15 +145,6 @@ case class ChainableOp[IN, OUT]( } } -object GroupByOp { - - def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int, - description: String, userConfig: UserConfig): Op = { - GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism, - description, userConfig) - } -} - /** * This represents a Processor with window aggregation */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 16d5c06..65f9cd2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala index 609fbb0..687fd2e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -17,20 +17,37 @@ */ package org.apache.gearpump.streaming.dsl.plan.functions -trait SingleInputFunction[IN, OUT] extends Serializable { +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction + +/** + * Internal function to process single input + * + * @param IN input value type + * @param OUT output value type + */ +sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) - } + def finish(): TraversableOnce[OUT] = None - def clearState(): Unit = {} + + def teardown(): Unit = {} + def description: String } -class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) +case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], + second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] { + override def setup(): Unit = { + first.setup() + second.setup() + } + override def process(value: IN): TraversableOnce[OUT] = { first.process(value).flatMap(second.process) } @@ -44,9 +61,9 @@ class AndThen[IN, MIDDLE, OUT]( } } - override def clearState(): Unit = { - first.clearState() - second.clearState() + override def teardown(): Unit = { + first.teardown() + second.teardown() } override def description: String = { @@ -56,22 +73,31 @@ class AndThen[IN, MIDDLE, OUT]( } } -class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) +class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) extends SingleInputFunction[IN, OUT] { + override def setup(): Unit = { + fn.setup() + } + override def process(value: IN): TraversableOnce[OUT] = { fn(value) } - override def description: String = descriptionMessage + override def teardown(): Unit = { + fn.teardown() + } } - -class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) +class Reducer[T](fn: ReduceFunction[T], val description: String) extends SingleInputFunction[T, T] { private var state: Option[T] = None + override def setup(): Unit = { + fn.setup() + } + override def process(value: T): TraversableOnce[T] = { if (state.isEmpty) { state = Option(value) @@ -85,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) state } - override def clearState(): Unit = { + override def teardown(): Unit = { state = None + fn.teardown() } - - override def description: String = descriptionMessage } -class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { +class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { override def process(value: T): TraversableOnce[Unit] = { emit(value) None } - override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { - throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") - } - override def description: String = "" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala new file mode 100644 index 0000000..430d795 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions._ +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.slf4j.{Logger, LoggerFactory} + +import scala.language.implicitConversions + +class Stream[T]( + private val graph: Graph[Op, OpEdge], private val thisNode: Op, + private val edge: Option[OpEdge] = None) { + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = { + transform(new FlatMapper[T, R](fn, description)) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: T => R, description: String = "map"): Stream[R] = { + this.map(MapFunction(fn), description) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: MapFunction[T, R], description: String): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { + this.filter(FilterFunction(fn), description) + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: FilterFunction[T], description: String): Stream[T] = { + this.flatMap(FlatMapFunction(fn), description) + } + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + reduce(ReduceFunction(fn), description) + } + + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: ReduceFunction[T], description: String): Stream[T] = { + transform(new Reducer[T](fn, description)) + } + + private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = { + val op = ChainableOp(fn) + graph.addVertex(op) + graph.addEdge(thisNode, edge.getOrElse(Direct), op) + new Stream(graph, op) + } + + /** + * Log to task log file + */ + def log(): Unit = { + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") + } + + /** + * Merges data from two stream into one + * + * @param other the other stream + * @return the merged stream + */ + def merge(other: Stream[T], description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(description, UserConfig.empty) + graph.addVertex(mergeOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) + graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) + new Stream[T](graph, mergeOp) + } + + /** + * Group by function (T => Group) + * + * For example, we have T type, People(name: String, gender: String, age: Int) + * groupBy[People](_.gender) will group the people by gender. + * + * You can append other combinators after groupBy + * + * For example, + * {{{ + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) + * }}} + * + * @param fn Group by function + * @param parallelism Parallelism level + * @param description The description + * @return the grouped stream + */ + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + window(CountWindow.apply(1).accumulating) + .groupBy[GROUP](fn, parallelism, description) + } + + /** + * Window function + * + * @param win window definition + * @param description window description + * @return [[WindowStream]] where groupBy could be applied + */ + def window(win: Window, description: String = "window"): WindowStream[T] = { + new WindowStream[T](graph, edge, thisNode, win, description) + } + + /** + * Connects with a low level Processor(TaskDescription) + * + * @param processor a user defined processor + * @param parallelism parallelism level + * @return new stream after processing with type [R] + */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) + graph.addVertex(processorOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) + new Stream[R](graph, processorOp, Some(Shuffle)) + } +} + +class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, + window: Window, winDesc: String) { + + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + s"$winDesc.$description") + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } +} + +class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { + /** + * GroupBy key + * + * Applies to Stream[Tuple2[K,V]] + * + * @param parallelism the parallelism for this operation + * @return the new KV stream + */ + def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { + stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") + } + + /** + * Sum the value of the tuples + * + * Apply to Stream[Tuple2[K,V]], V must be of type Number + * + * For input (key, value1), (key, value2), will generate (key, value1 + value2) + * @param numeric the numeric operations + * @return the sum stream + */ + def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { + stream.reduce(Stream.sumByKey[K, V](numeric), "sum") + } +} + +object Stream { + + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { + new Stream[T](graph, node, edge) + } + + def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] + = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { + new KVStream(stream) + } + + implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) + stream.graph.addVertex(sink) + stream.graph.addEdge(stream.thisNode, Shuffle, sink) + new Stream[T](stream.graph, sink) + } + } +} + +class LoggerSink[T] extends DataSink { + var logger: Logger = _ + + override def open(context: TaskContext): Unit = { + this.logger = context.logger + } + + override def write(message: Message): Unit = { + logger.info("logging message " + message.msg) + } + + override def close(): Unit = Unit +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala new file mode 100644 index 0000000..d6eed2e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.Graph + +import scala.language.implicitConversions + +/** + * Example: + * {{{ + * val data = "This is a good start, bingo!! bingo!!" + * app.fromCollection(data.lines.toList). + * // word => (word, count) + * flatMap(line => line.split("[\\s]+")).map((_, 1)). + * // (word, count1), (word, count2) => (word, count1 + count2) + * groupBy(kv => kv._1).reduce(sum(_, _)) + * + * val appId = context.submit(app) + * context.close() + * }}} + * + * @param name name of app + */ +class StreamApp( + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { + + def this(name: String, system: ActorSystem, userConfig: UserConfig) = { + this(name, system, userConfig, Graph.empty[Op, OpEdge]) + } + + def plan(): StreamApplication = { + implicit val actorSystem = system + val planner = new Planner + val dag = planner.plan(graph) + StreamApplication(name, dag, userConfig) + } +} + +object StreamApp { + def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) + : StreamApp = { + new StreamApp(name, context.system, userConfig) + } + + implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { + streamApp.plan() + } + + implicit class Source(app: StreamApp) extends java.io.Serializable { + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { + implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) + app.graph.addVertex(sourceOp) + new Stream[T](app.graph, sourceOp) + } + + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) + } + } +} + +/** A test message source which generated message sequence repeatedly. */ +class CollectionDataSource[T](seq: Seq[T]) extends DataSource { + private lazy val iterator: Iterator[T] = seq.iterator + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = { + if (iterator.hasNext) { + Message(iterator.next(), Instant.now().toEpochMilli) + } else { + null + } + } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala new file mode 100644 index 0000000..f10a3db --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.scalaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction} + +import scala.collection.JavaConverters._ + +object FlatMapFunction { + + def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + fn.apply(t).asScala + } + + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + override def apply(t: T): TraversableOnce[R] = { + fn(t) + } + } + } + + def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + Option(fn(t)) + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = { + new FlatMapFunction[T, T] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[T] = { + if (fn(t)) { + Option(t) + } else { + None + } + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } +} + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Scala version of FlatMapFunction returns a TraversableOnce. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): TraversableOnce[R] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala new file mode 100644 index 0000000..ab88bf1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.scalaapi.functions + +/** + * Superclass for all user defined function interfaces. + * This ensures all functions are serializable and provides common methods + * like setup and teardown. Users should not extend this class directly + * but subclasses like [[FlatMapFunction]]. + */ +abstract class SerializableFunction extends java.io.Serializable { + + def setup(): Unit = {} + + def teardown(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala index 4ee2fa8..06f2964 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task import java.time.Instant -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala index 4b7649f..0674339 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task import java.time.Instant -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala index 980a54b..78ba762 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -21,7 +21,6 @@ import java.time.Instant import java.util.concurrent.TimeUnit import akka.actor.Actor.Receive -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index e35f085..f8fbefa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -17,21 +17,26 @@ */ package org.apache.gearpump.streaming.dsl.task +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} -class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { +class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], + taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { this(userConf.getValue[SingleInputFunction[IN, OUT]]( GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) } + override def onStart(startTime: Instant): Unit = { + operator.foreach(_.setup()) + } + override def onNext(msg: Message): Unit = { val time = msg.timestamp @@ -44,4 +49,8 @@ class TransformTask[IN, OUT]( taskContext.output(new Message(msg.msg, time)) } } + + override def onStop(): Unit = { + operator.foreach(_.teardown()) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 9af5e61..223a4af 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -22,12 +22,13 @@ import java.time.Instant import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.gs.collections.api.block.procedure.Procedure -import org.apache.gearpump.gs.collections.impl.list.mutable.FastList -import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap -import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import com.gs.collections.api.block.procedure.Procedure +import com.gs.collections.impl.list.mutable.FastList +import com.gs.collections.impl.map.mutable.UnifiedMap +import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import com.gs.collections.impl.set.mutable.UnifiedSet import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction} import org.apache.gearpump.streaming.dsl.window.api.Discarding import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil @@ -38,7 +39,6 @@ trait WindowRunner { def process(message: Message): Unit def trigger(time: Instant): Unit - } object DefaultWindowRunner { @@ -46,18 +46,6 @@ object DefaultWindowRunner { private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) - extends Comparable[WindowGroup[GROUP]] { - override def compareTo(o: WindowGroup[GROUP]): Int = { - val ret = bucket.compareTo(o.bucket) - if (ret != 0) { - ret - } else if (group.equals(o.group)) { - 0 - } else { - -1 - } - } - } } class DefaultWindowRunner[IN, GROUP, OUT]( @@ -66,20 +54,27 @@ class DefaultWindowRunner[IN, GROUP, OUT]( extends WindowRunner { import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ - private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]] + private val windows = new TreeSortedMap[Bucket, UnifiedSet[WindowGroup[GROUP]]] + private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]] private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] - override def process(message: Message): Unit = { val (group, buckets) = groupBy.groupBy(message) buckets.foreach { bucket => val wg = WindowGroup(bucket, group) + val wgs = windows.getOrDefault(bucket, new UnifiedSet[WindowGroup[GROUP]](1)) + wgs.add(wg) + windows.put(bucket, wgs) + val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) inputs.add(message.msg.asInstanceOf[IN]) windowGroups.put(wg, inputs) } - groupFns.putIfAbsent(group, - userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + if (!groupFns.containsKey(group)) { + val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + fn.setup() + groupFns.put(group, fn) + } } override def trigger(time: Instant): Unit = { @@ -87,21 +82,28 @@ class DefaultWindowRunner[IN, GROUP, OUT]( @annotation.tailrec def onTrigger(): Unit = { - if (windowGroups.notEmpty()) { - val first = windowGroups.firstKey - if (!time.isBefore(first.bucket.endTime)) { - val inputs = windowGroups.remove(first) - val reduceFn = groupFns.get(first.group) - .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) - inputs.forEach(new Procedure[IN] { - override def value(t: IN): Unit = { - reduceFn.process(t) + if (windows.notEmpty()) { + val first = windows.firstKey + if (!time.isBefore(first.endTime)) { + val wgs = windows.remove(first) + wgs.forEach(new Procedure[WindowGroup[GROUP]] { + override def value(each: WindowGroup[GROUP]): Unit = { + val inputs = windowGroups.remove(each) + val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + // .toList forces eager evaluation + reduceFn.process(t).toList + } + }) + // .toList forces eager evaluation + reduceFn.finish().toList + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.teardown() + } } }) - reduceFn.finish() - if (groupBy.window.accumulationMode == Discarding) { - reduceFn.clearState() - } + onTrigger() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala index d045def..8f8b7ab 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala @@ -20,12 +20,11 @@ package org.apache.gearpump.streaming.metrics import java.util +import com.google.common.collect.Iterators import com.typesafe.config.Config - import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.ClientToMaster.ReadOption import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem -import org.apache.gearpump.google.common.collect.Iterators import org.apache.gearpump.metrics.Metrics.{Histogram, Meter} import org.apache.gearpump.metrics.MetricsAggregator import org.apache.gearpump.streaming.metrics.ProcessorAggregator._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala new file mode 100644 index 0000000..9b63e04 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** Used by storm module to broadcast message to all downstream tasks */ +class BroadcastPartitioner extends MulticastPartitioner { + private var lastPartitionNum = -1 + private var partitions = Array.empty[Int] + + override def getPartitions( + msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + if (partitionNum != lastPartitionNum) { + partitions = (0 until partitionNum).toArray + lastPartitionNum = partitionNum + } + partitions + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala new file mode 100644 index 0000000..4cb1bad --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** + * Will have the same parallelism with last processor + * And each task in current processor will co-locate with task of last processor + */ +class CoLocationPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + currentPartitionId + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala new file mode 100644 index 0000000..6137705 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** + * Only make sense when the message has implemented the hashCode() + * Otherwise, it will use Object.hashCode(), which will not return + * same hash code after serialization and deserialization. + */ +class HashPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala new file mode 100644 index 0000000..f685cc9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.commons.lang.SerializationUtils +import org.apache.gearpump.Message + +import scala.reflect.ClassTag + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task + * of upstream processor A send to several tasks of downstream processor B. + */ +sealed trait Partitioner extends Serializable + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does + * ONE-task {@literal ->} ONE-task mapping. + */ +trait UnicastPartitioner extends Partitioner { + + /** + * Gets the SINGLE downstream processor task index to send message to. + * + * @param msg Message you want to send + * @param partitionNum How many tasks does the downstream processor have. + * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. + * + * @return ONE task index of downstream processor. + */ + def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int + + def getPartition(msg: Message, partitionNum: Int): Int = { + getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +trait MulticastPartitioner extends Partitioner { + + /** + * Gets a list of downstream processor task indexes to send message to. + * + * @param upstreamTaskIndex Current sender task's task index. + * + */ + def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] + + def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { + getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +sealed trait PartitionerFactory { + + def name: String + + def partitioner: Partitioner +} + +/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ +class PartitionerObject(private[this] val _partitioner: Partitioner) + extends PartitionerFactory with Serializable { + + override def name: String = partitioner.getClass.getName + + override def partitioner: Partitioner = { + SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] + } +} + +/** Store the partitioner in class Name, the user need to instantiate a new class */ +class PartitionerByClassName(partitionerClass: String) + extends PartitionerFactory with Serializable { + + override def name: String = partitionerClass + override def partitioner: Partitioner = { + Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] + } +} + +/** + * @param partitionerFactory How we construct a Partitioner. + */ +case class PartitionerDescription(partitionerFactory: PartitionerFactory) + +object Partitioner { + val UNKNOWN_PARTITION_ID = -1 + + def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { + PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala new file mode 100644 index 0000000..1b223e0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +import scala.util.Random + +/** + * The idea of ShuffleGroupingPartitioner is derived from Storm. + * Messages are randomly distributed across the downstream's tasks in a way such that + * each task is guaranteed to get an equal number of messages. + */ +class ShuffleGroupingPartitioner extends UnicastPartitioner { + private val random = new Random + private var index = -1 + private var partitions = List.empty[Int] + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + index += 1 + if (partitions.isEmpty) { + partitions = 0.until(partitionNum).toList + partitions = random.shuffle(partitions) + } else if (index >= partitionNum) { + index = 0 + partitions = random.shuffle(partitions) + } + partitions(index) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala new file mode 100644 index 0000000..39d5e3b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import java.util.Random + +import org.apache.gearpump.Message + +/** + * Round Robin partition the data to downstream processor tasks. + */ +class ShufflePartitioner extends UnicastPartitioner { + private var seed = 0 + private var count = 0 + + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + + if (seed == 0) { + seed = newSeed() + } + + val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum + count = count + 1 + result + } + + private def newSeed(): Int = new Random().nextInt() +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 535497c..450f2d6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source]( override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) + operator.foreach(_.setup()) self ! Watermark(source.getWatermark) } @@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source]( } override def onStop(): Unit = { + operator.foreach(_.teardown()) LOG.info("closing data source...") source.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala index 902c663..3b32163 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.task -import org.apache.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap} +import com.esotericsoftware.kryo.util.{IntMap, ObjectMap} import org.apache.gearpump.streaming.task.SerializerResolver.Registration private[task] class SerializerResolver { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala index 692d7f9..5c99980 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.task -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.{DAG, LifeTime} /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index d9fbc82..4193fbf 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.task import org.slf4j.Logger -import org.apache.gearpump.google.common.primitives.Shorts -import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} +import com.google.common.primitives.Shorts +import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.task.Subscription._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index f72e5b8..92f6672 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -23,17 +23,17 @@ import java.util import java.util.concurrent.TimeUnit import akka.actor._ +import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap +import org.apache.gearpump.streaming.source.Watermark +import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap import org.apache.gearpump.metrics.Metrics import org.apache.gearpump.serializer.SerializationFramework import org.apache.gearpump.streaming.AppMasterToExecutor._ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} import org.apache.gearpump.{Message, TimeStamp} -import org.slf4j.Logger /** * @@ -52,9 +52,10 @@ class TaskActor( def serializerPool: SerializationFramework = inputSerializerPool + import taskContextData._ + import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.task.TaskActor._ - import taskContextData._ val config = context.system.settings.config val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) @@ -75,9 +76,9 @@ class TaskActor( private var life = taskContextData.life // Latency probe - import context.dispatcher - import scala.concurrent.duration._ + + import context.dispatcher final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) // Clock report interval http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala index 5f4faee..ccda8f0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph.Node http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index e461ae8..29dfc57 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask} import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index d42fe6f..46175a4 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock} import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store import org.apache.gearpump.streaming.storage.AppDataStore http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index be3b3b7..adde927 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -22,7 +22,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} import org.apache.gearpump.streaming._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala index 5f6dd04..def9d44 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -22,7 +22,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppJar, TestUtil} import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index 54ecde1..bcf96e4 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig} import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered} import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask import org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index 864aa93..1bfde94 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala deleted file mode 100644 index e0407ec..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.partitioner.PartitionerDescription -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.util.Graph -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = _ - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "be able to generate multiple new streams" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val dsl = StreamApp("dsl", context) - dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] - dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] - - val application = dsl.plan() - application shouldBe a [StreamApplication] - application.name shouldBe "dsl" - val dag = application.userConfig - .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - dag.vertices.size shouldBe 2 - dag.vertices.foreach { processor => - processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName - if (processor.description == "A") { - processor.parallelism shouldBe 2 - } else if (processor.description == "B") { - processor.parallelism shouldBe 3 - } else { - fail(s"undefined source ${processor.description}") - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala deleted file mode 100644 index fdc721b..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import akka.actor._ -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} -import org.apache.gearpump.streaming.dsl.StreamSpec.Join -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.Graph -import org.apache.gearpump.util.Graph._ -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.util.{Either, Left, Right} - -class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = _ - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "translate the DSL to a DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val dsl = StreamApp("dsl", context) - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val stream = dsl.source(data.lines.toList, 1, ""). - flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). - map(word => (word, 1)). - groupBy(_._1, parallelism = 2). - reduce((left, right) => (left._1, left._2 + right._2)). - map[Either[(String, Int), String]](Left(_)) - - val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) - stream.merge(query).process[(String, Int)](classOf[Join], 1) - - val app: StreamApplication = dsl.plan() - val dag = app.userConfig - .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - - val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => - edge.partitionerFactory.partitioner.getClass.getName - } - val expectedDagTopology = getExpectedDagTopology - - dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet - dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet - } - - private def getExpectedDagTopology: Graph[String, String] = { - val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[CountTriggerTask[_, _]].getName - val merge = classOf[TransformTask[_, _]].getName - val join = classOf[Join].getName - - val hash = classOf[HashPartitioner].getName - val groupBy = classOf[GroupByPartitioner[_, _]].getName - val colocation = classOf[CoLocationPartitioner].getName - - val expectedDagTopology = Graph( - source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, - source ~ hash ~> merge - ) - expectedDagTopology - } -} - -object StreamSpec { - - class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - var query: String = _ - - override def onNext(msg: Message): Unit = { - msg.msg match { - case Left(wordCount: (String @unchecked, Int @unchecked)) => - if (query != null && wordCount._1 == query) { - taskContext.output(new Message(wordCount)) - } - - case Right(query: String) => - this.query = query - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index bf52abc..f0920de 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val chainedOp = chainableOp1.chain(chainableOp2) - verify(fn1).andThen(fn2) chainedOp shouldBe a[ChainableOp[_, _]] unchainableOps.foreach { op => @@ -155,12 +155,14 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "throw exception on getProcessor" in { - val fn1 = mock[SingleInputFunction[Any, Any]] - val chainableOp1 = ChainableOp[Any, Any](fn1) - intercept[UnsupportedOperationException] { - chainableOp1.getProcessor - } + "get Processor" in { + val fn = mock[FlatMapFunction[Any, Any]] + val flatMapper = new FlatMapper(fn, "flatMap") + val chainableOp = ChainableOp[Any, Any](flatMapper) + + val processor = chainableOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 } }
