http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala deleted file mode 100644 index a2ac70f..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl - -import scala.language.implicitConversions - -import akka.actor.ActorSystem - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.StreamApplication -import io.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} -import io.gearpump.streaming.dsl.plan.Planner -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.task.{Task, TaskContext} -import io.gearpump.util.Graph -import io.gearpump.{Message, TimeStamp} - -/** - * Example: - * {{{ - * val data = "This is a good start, bingo!! bingo!!" - * app.fromCollection(data.lines.toList). - * // word => (word, count) - * flatMap(line => line.split("[\\s]+")).map((_, 1)). - * // (word, count1), (word, count2) => (word, count1 + count2) - * groupBy(kv => kv._1).reduce(sum(_, _)) - * - * val appId = context.submit(app) - * context.close() - * }}} - * - * @param name name of app - */ -class StreamApp( - val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { - - def this(name: String, system: ActorSystem, userConfig: UserConfig) = { - this(name, system, userConfig, Graph.empty[Op, OpEdge]) - } - - def plan(): StreamApplication = { - implicit val actorSystem = system - val planner = new Planner - val dag = planner.plan(graph) - StreamApplication(name, dag, userConfig) - } -} - -object StreamApp { - def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) - : StreamApp = { - new StreamApp(name, context.system, userConfig) - } - - implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { - streamApp.plan - } - - implicit class Source(app: StreamApp) extends java.io.Serializable { - - def source[T](dataSource: DataSource, parallism: Int): Stream[T] = { - source(dataSource, parallism, UserConfig.empty) - } - - def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = { - source(dataSource, parallism, UserConfig.empty, description) - } - - def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = { - source(dataSource, parallism, conf, description = null) - } - - def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description) - } - - def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source")) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - } -} - -/** A test message source which generated message sequence repeatedly. */ -class CollectionDataSource[T](seq: Seq[T]) extends DataSource { - private val iterator: Iterator[T] = seq.iterator - - override def read(): Message = { - if (iterator.hasNext) { - Message(iterator.next()) - } else { - null - } - } - - override def close(): Unit = {} - - override def open(context: TaskContext, startTime: TimeStamp): Unit = {} -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala deleted file mode 100644 index 549cc6e..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.javaapi - -import scala.collection.JavaConverters._ - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.dsl.Stream -import io.gearpump.streaming.javaapi.dsl.functions._ -import io.gearpump.streaming.task.Task - -/** - * Java DSL - */ -class JavaStream[T](val stream: Stream[T]) { - - /** FlatMap on stream */ - def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) - } - - /** Map on stream */ - def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.map({ t: T => fn(t) }, description)) - } - - /** Only keep the messages that FilterFunction returns true. */ - def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) - } - - /** Does aggregation on the stream */ - def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) - } - - def log(): Unit = { - stream.log() - } - - /** Merges streams of same type together */ - def merge(other: JavaStream[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.merge(other.stream, description)) - } - - /** - * Group by a stream and turns it to a list of sub-streams. Operations chained after - * groupBy applies to sub-streams. - */ - def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) - : JavaStream[T] = { - new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) - } - - /** Add a low level Processor to process messages */ - def process[R]( - processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) - : JavaStream[R] = { - new JavaStream[R](stream.process(processor, parallelism, conf, description)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala deleted file mode 100644 index e39e054..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.javaapi - -import java.util.Collection -import scala.collection.JavaConverters._ - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} -import io.gearpump.streaming.source.DataSource - -class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { - - private val streamApp = StreamApp(name, context, userConfig) - - def source[T](collection: Collection[T], parallelism: Int, - conf: UserConfig, description: String): JavaStream[T] = { - val dataSource = new CollectionDataSource(collection.asScala.toSeq) - source(dataSource, parallelism, conf, description) - } - - def source[T](dataSource: DataSource, parallelism: Int, - conf: UserConfig, description: String): JavaStream[T] = { - new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) - } - - def run(): Unit = { - context.submit(streamApp) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala deleted file mode 100644 index f0a86fa..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.op - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.task.Task - -/** - * Operators for the DSL - */ -sealed trait Op { - def description: String - def conf: UserConfig -} - -/** - * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP - * "Attach" means running in same Actor. - */ -trait SlaveOp[T] extends Op - -case class FlatMapOp[T, R]( - fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -trait MasterOp extends Op - -trait ParameterizedOp[T] extends MasterOp - -case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) - extends MasterOp - -case class GroupByOp[T, R]( - fun: T => R, parallelism: Int, description: String, - override val conf: UserConfig = UserConfig.empty) - extends ParameterizedOp[T] - -case class ProcessorOp[T <: Task]( - processor: Class[T], parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSourceOp[T]( - dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSinkOp[T]( - dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -/** - * Contains operators which can be chained to single one. - * - * For example, flatmap().map().reduce() can be chained to single operator as - * no data shuffling is required. - * @param ops list of operations - */ -case class OpChain(ops: List[Op]) extends Op { - def head: Op = ops.head - def last: Op = ops.last - - def description: String = null - - override def conf: UserConfig = { - // The head's conf has priority - ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => - conf.withConfig(op.conf) - } - } -} - -trait OpEdge - -/** - * The upstream OP and downstream OP doesn't require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Direct extends OpEdge - -/** - * The upstream OP and downstream OP DOES require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Shuffle extends OpEdge - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala deleted file mode 100644 index b842c7b..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.partitioner - -import io.gearpump.Message -import io.gearpump.partitioner.UnicastPartitioner - -/** - * Partition messages by applying group by function first. - * - * For example: - * {{{ - * case class People(name: String, gender: String) - * - * object Test{ - * - * val groupBy: (People => String) = people => people.gender - * val partitioner = GroupByPartitioner(groupBy) - * } - * }}} - * - * @param groupBy First apply message with groupBy function, then pick the hashCode of the output - * to do the partitioning. You must define hashCode() for output type of groupBy function. - */ -class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() - (hashCode & Integer.MAX_VALUE) % partitionNum - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala deleted file mode 100644 index 11b4c34..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.plan - -import scala.collection.TraversableOnce - -import akka.actor.ActorSystem -import org.slf4j.Logger - -import io.gearpump._ -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.Constants._ -import io.gearpump.streaming.Processor -import io.gearpump.streaming.Processor.DefaultProcessor -import io.gearpump.streaming.dsl.op._ -import io.gearpump.streaming.dsl.plan.OpTranslator._ -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.util.LogUtil - -/** - * Translates a OP to a TaskDescription - */ -class OpTranslator extends java.io.Serializable { - val LOG: Logger = LogUtil.getLogger(getClass) - - def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = { - - val baseConfig = ops.conf - - ops.ops.head match { - case op: MasterOp => - val tail = ops.ops.tail - val func = toFunction(tail) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - op match { - case DataSourceOp(dataSource, parallism, conf, description) => - Processor[SourceTask[Object, Object]](parallism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@GroupByOp(_, parallism, description, _) => - Processor[GroupByTask[Object, Object, Object]](parallism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) - case merge: MergeOp => - Processor[TransformTask[Object, Object]](1, - description = op.description + "." + func.description, - userConfig) - case ProcessorOp(processor, parallism, conf, description) => - DefaultProcessor(parallism, - description = description + "." + func.description, - userConfig, processor) - case DataSinkOp(dataSink, parallelism, conf, description) => - Processor[SinkTask[Object]](parallelism, - description = description + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink)) - } - case op: SlaveOp[_] => - val func = toFunction(ops.ops) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - Processor[TransformTask[Object, Object]](1, - description = func.description, - taskConf = userConfig) - case chain: OpChain => - throw new RuntimeException("Not supposed to be called!") - } - } - - private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = { - val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]() - val totalFunction = ops.foldLeft(func) { (fun, op) => - - val opFunction = op match { - case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => - new FlatMapFunction(flatmap.fun, flatmap.description) - case reduce: ReduceOp[Object @unchecked] => - new ReduceFunction(reduce.fun, reduce.description) - case _ => - throw new RuntimeException("Not supposed to be called!") - } - fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) - } - totalFunction.asInstanceOf[SingleInputFunction[Object, Object]] - } -} - -object OpTranslator { - - trait SingleInputFunction[IN, OUT] extends Serializable { - def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) - } - - def description: String - } - - class DummyInputFunction[T] extends SingleInputFunction[T, T] { - override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) - : SingleInputFunction[T, OUTER] = { - other - } - - // Should never be called - override def process(value: T): TraversableOnce[T] = None - - override def description: String = "" - } - - class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process(_)) - } - - override def description: String = { - Option(first.description).flatMap { description => - Option(second.description).map(description + "." + _) - }.getOrElse(null) - } - } - - class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - fun(value) - } - - override def description: String = { - this.descriptionMessage - } - } - - class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) - extends SingleInputFunction[T, T] { - - private var state: Any = null - - override def process(value: T): TraversableOnce[T] = { - if (state == null) { - state = value - } else { - state = fun(state.asInstanceOf[T], value) - } - Some(state.asInstanceOf[T]) - } - - override def description: String = descriptionMessage - } - - class GroupByTask[IN, GROUP, OUT]( - groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[GroupByOp[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, - taskContext, userConf) - } - - private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] - - override def onStart(startTime: StartTime): Unit = { - } - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - val group = groupBy(msg.msg.asInstanceOf[IN]) - if (!groups.contains(group)) { - val operator = - userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groups += group -> operator - } - - val operator = groups(group) - - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - } - } - - class SourceTask[T, OUT]( - source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, - userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this( - userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get, - userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)( - taskContext.system), - taskContext, userConf) - } - - override def onStart(startTime: StartTime): Unit = { - source.open(taskContext, startTime.startTime) - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(msg: Message): Unit = { - val time = System.currentTimeMillis() - Option(source.read()).foreach { msg => - operator match { - case Some(operator) => - operator match { - case bad: DummyInputFunction[T] => - taskContext.output(msg) - case _ => - operator.process(msg.msg.asInstanceOf[T]).foreach(msg => { - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - }) - } - case None => - taskContext.output(msg) - } - } - - self ! Message("next", System.currentTimeMillis()) - } - - override def onStop(): Unit = { - source.close() - } - } - - class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) - } - - override def onStart(startTime: StartTime): Unit = { - } - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - operator match { - case Some(operator) => - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - case None => - taskContext.output(new Message(msg.msg, time)) - } - } - } - - class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get, - taskContext, userConf) - } - - override def onStart(startTime: StartTime): Unit = { - dataSink.open(taskContext) - } - - override def onNext(msg: Message): Unit = { - dataSink.write(msg) - } - - override def onStop(): Unit = { - dataSink.close() - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala deleted file mode 100644 index aafd8d3..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl.plan - -import akka.actor.ActorSystem - -import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} -import io.gearpump.streaming.Processor -import io.gearpump.streaming.dsl.op._ -import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import io.gearpump.streaming.task.Task -import io.gearpump.util.Graph - -class Planner { - - /* - * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low - * level Graph API. - */ - def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) - : Graph[Processor[_ <: Task], _ <: Partitioner] = { - - val opTranslator = new OpTranslator() - - val newDag = optimize(dag) - newDag.mapEdge { (node1, edge, node2) => - edge match { - case Shuffle => - node2.head match { - case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => - new GroupByPartitioner(groupBy.fun) - case _ => new HashPartitioner - } - case Direct => - new CoLocationPartitioner - } - }.mapVertex { opChain => - opTranslator.translate(opChain) - } - } - - private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { - val newGraph = dag.mapVertex(op => OpChain(List(op))) - - val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse - for (node <- nodes) { - val outGoingEdges = newGraph.outgoingEdgesOf(node) - for (edge <- outGoingEdges) { - merge(newGraph, edge._1, edge._3) - } - } - newGraph - } - - private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) - : Graph[OpChain, OpEdge] = { - if (dag.outDegreeOf(node1) == 1 && - dag.inDegreeOf(node2) == 1 && - // For processor node, we don't allow it to merge with downstream operators - !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { - val (_, edge, _) = dag.outgoingEdgesOf(node1)(0) - if (edge == Direct) { - val opList = OpChain(node1.ops ++ node2.ops) - dag.addVertex(opList) - for (incomingEdge <- dag.incomingEdgesOf(node1)) { - dag.addEdge(incomingEdge._1, incomingEdge._2, opList) - } - - for (outgoingEdge <- dag.outgoingEdgesOf(node2)) { - dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) - } - - // Remove the old vertex - dag.removeVertex(node1) - dag.removeVertex(node2) - } - } - dag - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala deleted file mode 100644 index 48007d5..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.executor - -import java.lang.management.ManagementFactory -import scala.concurrent.duration._ - -import akka.actor.SupervisorStrategy.Resume -import akka.actor._ -import com.typesafe.config.Config -import org.apache.commons.lang.exception.ExceptionUtils -import org.slf4j.Logger - -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig} -import io.gearpump.metrics.Metrics.ReportMetrics -import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import io.gearpump.serializer.SerializationFramework -import io.gearpump.streaming.AppMasterToExecutor.{MsgLostException, TasksChanged, TasksLaunched, _} -import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.executor.Executor._ -import io.gearpump.streaming.executor.TaskLauncher.TaskArgument -import io.gearpump.streaming.task.{Subscriber, TaskId} -import io.gearpump.transport.{Express, HostPort} -import io.gearpump.util.Constants._ -import io.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler} - -/** - * Executor is child of AppMaster. - * It usually represents a JVM process. It is a container for all tasks. - */ - -// TODO: What if Executor stuck in state DynamicDag and cannot get out??? -// For example, due to some message loss when there is network glitch. -// Executor will hang there for ever??? -// -class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher: ITaskLauncher) - extends Actor with TimeOutScheduler{ - - def this(executorContext: ExecutorContext, userConf: UserConfig) = { - this(executorContext, userConf, TaskLauncher(executorContext, userConf)) - } - - import context.dispatcher - import executorContext.{appId, appMaster, executorId, resource, worker} - - private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) - - private implicit val timeOut = FUTURE_TIMEOUT - private val address = ActorUtil.getFullPath(context.system, self.path) - private val systemConfig = context.system.settings.config - private val serializerPool = getSerializerPool() - private val taskDispatcher = systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER) - - private var state = State.ACTIVE - private var transitionStart = 0L - // States transition start, in unix time - private var transitionEnd = 0L - // States transition end, in unix time - private val transitWarningThreshold = 5000 // ms, - - // Starts health check Ticks - self ! HealthCheck - - LOG.info(s"Executor $executorId has been started, start to register itself...") - LOG.info(s"Executor actor path: ${ActorUtil.getFullPath(context.system, self.path)}") - - appMaster ! RegisterExecutor(self, executorId, resource, worker) - context.watch(appMaster) - - private var tasks = Map.empty[TaskId, ActorRef] - private val taskArgumentStore = new TaskArgumentStore() - - val express = Express(context.system) - - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - - if (metricsEnabled) { - // Registers jvm metrics - Metrics(context.system).register(new JvmMetricsSet(s"app$appId.executor$executorId")) - - val metricsReportService = context.actorOf(Props(new MetricsReporterService( - Metrics(context.system)))) - appMaster.tell(ReportMetrics, metricsReportService) - } - - private val NOT_INITIALIZED = -1 - def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED) - - private def getTaskId(actorRef: ActorRef): Option[TaskId] = { - tasks.find(_._2 == actorRef).map(_._1) - } - - override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { - case _: MsgLostException => - val taskId = getTaskId(sender) - val cause = s"We got MessageLossException from task ${getTaskId(sender)}, " + - s"replaying application..." - LOG.error(cause) - taskId.foreach(appMaster ! MessageLoss(executorId, _, cause)) - Resume - case ex: Throwable => - val taskId = getTaskId(sender) - val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will treat it as" + - s" MessageLoss, so that the system will replay all lost message" - LOG.error(errorMsg, ex) - val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex) - taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg)) - Resume - } - - private def launchTask(taskId: TaskId, argument: TaskArgument): ActorRef = { - launcher.launch(List(taskId), argument, context, serializerPool, taskDispatcher).values.head - } - - private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit = { - if (expectVersion != version) { - val errorMessage = s"Version mismatch: we expect dag version $expectVersion, " + - s"but get $version; clue: $clue" - LOG.error(errorMessage) - throw new DagVersionMismatchException(errorMessage) - } - } - - def dynamicDagPhase1( - dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], registered: List[TaskId]) - : Receive = { - state = State.DYNAMIC_DAG_PHASE1 - box({ - case launch@LaunchTasks(taskIds, version, processorDescription, - subscribers: List[Subscriber]) => { - assertVersion(dagVersion, version, clue = launch) - - LOG.info(s"Launching Task $taskIds for app: $appId") - val taskArgument = TaskArgument(version, processorDescription, subscribers) - taskIds.foreach(taskArgumentStore.add(_, taskArgument)) - val newAdded = launcher.launch(taskIds, taskArgument, context, serializerPool, - taskDispatcher) - newAdded.foreach { newAddedTask => - context.watch(newAddedTask._2) - } - tasks ++= newAdded - sender ! TasksLaunched - context.become(dynamicDagPhase1(version, launched ++ taskIds, changed, registered)) - } - case change@ChangeTasks(taskIds, version, life, subscribers) => - assertVersion(dagVersion, version, clue = change) - - LOG.info(s"Change Tasks $taskIds for app: $appId, verion: $life, $dagVersion, $subscribers") - - val newChangedTasks = taskIds.map { taskId => - for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) { - val processorDescription = taskArgument.processorDescription.copy(life = life) - taskArgumentStore.add(taskId, TaskArgument(dagVersion, processorDescription, - subscribers)) - } - ChangeTask(taskId, dagVersion, life, subscribers) - } - sender ! TasksChanged(taskIds) - context.become(dynamicDagPhase1(dagVersion, launched, changed ++ newChangedTasks, - registered)) - - case locations@TaskLocationsReady(taskLocations, version) => - LOG.info(s"TaskLocations Ready...") - assertVersion(dagVersion, version, clue = locations) - - // Check whether all tasks has been registered. - if ((launched.toSet -- registered.toSet).isEmpty) { - // Confirm all tasks has been registered. - val result = taskLocations.locations.filter { - location => !location._1.equals(express.localHost) - }.flatMap { kv => - val (host, taskIdList) = kv - taskIdList.map(taskId => (TaskId.toLong(taskId), host)) - } - - val replyTo = sender - express.startClients(taskLocations.locations.keySet).foreach { _ => - express.remoteAddressMap.send(result) - express.remoteAddressMap.future().foreach { _ => - LOG.info(s"sending TaskLocationsReceived back to appmaster") - replyTo ! TaskLocationsReceived(version, executorId) - } - } - context.become(dynamicDagPhase2(dagVersion, launched, changed)) - } else { - LOG.error("Inconsistency between AppMaser and Executor! AppMaster thinks DynamicDag " + - "transition is ready, while Executor have not get all tasks registered, " + - "that task will not be functional...") - - // Reject TaskLocations... - val missedTasks = (launched.toSet -- registered.toSet).toList - val errorMsg = "We have not received TaskRegistered for following tasks: " + - missedTasks.mkString(", ") - LOG.error(errorMsg) - sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, null) - // Stays with current status... - } - - case confirm: TaskRegistered => - tasks.get(confirm.taskId).foreach { - case actorRef: ActorRef => - tasks += confirm.taskId -> actorRef - actorRef forward confirm - } - context.become(dynamicDagPhase1(dagVersion, launched, changed, - registered :+ confirm.taskId)) - - case rejected: TaskRejected => - // Means this task shoud not exists... - tasks.get(rejected.taskId).foreach(_ ! PoisonPill) - tasks -= rejected.taskId - LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting down it...") - - case register: RegisterTask => - appMaster ! register - }) - } - - def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask]) - : Receive = { - LOG.info("Transit to dynamic Dag Phase2") - state = State.DYNAMIC_DAG_PHASE2 - box { - case startAll@StartAllTasks(version) => - LOG.info(s"Start All Tasks...") - assertVersion(dagVersion, version, clue = startAll) - - launched.foreach(taskId => tasks.get(taskId).foreach(_ ! StartTask(taskId))) - changed.foreach(changeTask => tasks.get(changeTask.taskId).foreach(_ ! changeTask)) - - taskArgumentStore.removeNewerVersion(dagVersion) - taskArgumentStore.removeObsoleteVersion - context.become(applicationReady(dagVersion)) - } - } - - def applicationReady(dagVersion: Int): Receive = { - state = State.ACTIVE - transitionEnd = System.currentTimeMillis() - - if (dagVersion != NOT_INITIALIZED) { - LOG.info("Transit to state Application Ready. This transition takes " + - (transitionEnd - transitionStart) + " milliseconds") - } - box { - case start: StartDynamicDag => - LOG.info("received StartDynamicDag") - if (start.dagVersion > dagVersion) { - transitionStart = System.currentTimeMillis() - LOG.info(s"received $start, Executor transit to dag version: ${start.dagVersion} from " + - s"current version $dagVersion") - context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId], - List.empty[ChangeTask], List.empty[TaskId])) - } - case launch: LaunchTasks => - if (launch.dagVersion > dagVersion) { - transitionStart = System.currentTimeMillis() - LOG.info(s"received $launch, Executor transit to dag " + - s"version: ${launch.dagVersion} from current version $dagVersion") - context.become(dynamicDagPhase1(launch.dagVersion, List.empty[TaskId], - List.empty[ChangeTask], List.empty[TaskId])) - self forward launch - } - - case change: ChangeTasks => - if (change.dagVersion > dagVersion) { - transitionStart = System.currentTimeMillis() - LOG.info(s"received $change, Executor transit to dag version: ${change.dagVersion} from" + - s" current version $dagVersion") - context.become(dynamicDagPhase1(change.dagVersion, List.empty[TaskId], - List.empty[ChangeTask], List.empty[TaskId])) - self forward change - } - - case StopTask(taskId) => - // Old soldiers never die, they just fade away ;) - val fadeAwayTask = tasks.get(taskId) - if (fadeAwayTask.isDefined) { - context.stop(fadeAwayTask.get) - } - tasks -= taskId - - case unRegister@UnRegisterTask(taskId, _) => - // Sends UnRegisterTask to AppMaster - appMaster ! unRegister - } - } - - def restartingTasks(dagVersion: Int, remain: Int, needRestart: List[TaskId]): Receive = { - state = State.RECOVERY - box { - case TaskStopped(actor) => - for (taskId <- getTaskId(actor)) { - if (taskArgumentStore.get(dagVersion, taskId).nonEmpty) { - val newNeedRestart = needRestart :+ taskId - val newRemain = remain - 1 - if (newRemain == 0) { - val newRestarted = newNeedRestart.map { taskId_ => - val taskActor = launchTask(taskId_, taskArgumentStore.get(dagVersion, taskId_).get) - context.watch(taskActor) - taskId_ -> taskActor - }.toMap - - tasks = newRestarted - context.become(dynamicDagPhase1(dagVersion, newNeedRestart, List.empty[ChangeTask], - List.empty[TaskId])) - } else { - context.become(restartingTasks(dagVersion, newRemain, newNeedRestart)) - } - } - } - } - } - - val terminationWatch: Receive = { - case Terminated(actor) => - if (actor.compareTo(appMaster) == 0) { - LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, shutting down current " + - s"executor $appId, $executorId") - context.stop(self) - } else { - self ! TaskStopped(actor) - } - } - - def onRestartTasks: Receive = { - case RestartTasks(dagVersion) => - LOG.info(s"Executor received restart tasks") - val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, _).nonEmpty) - express.remoteAddressMap.send(Map.empty[Long, HostPort]) - context.become(restartingTasks(dagVersion, remain = tasksToRestart, - needRestart = List.empty[TaskId])) - - tasks.values.foreach { - case task: ActorRef => task ! PoisonPill - } - } - - def executorService: Receive = terminationWatch orElse onRestartTasks orElse { - case taskChanged: TaskChanged => - // Skip - case get: GetExecutorSummary => - val logFile = LogUtil.applicationLogDir(systemConfig) - val processorTasks = tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force - sender ! ExecutorSummary( - executorId, - worker.workerId, - address, - logFile.getAbsolutePath, - state, - tasks.size, - processorTasks, - jvmName = ManagementFactory.getRuntimeMXBean().getName()) - - case query: QueryExecutorConfig => - sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - case HealthCheck => - context.system.scheduler.scheduleOnce(3.second)(HealthCheck) - if (state != State.ACTIVE && (transitionEnd - transitionStart) > transitWarningThreshold) { - LOG.error(s"Executor status: " + state + - s", it takes too long(${transitionEnd - transitionStart}) to do transition") - } - } - - private def getSerializerPool(): SerializationFramework = { - val system = context.system.asInstanceOf[ExtendedActorSystem] - val clazz = Class.forName(systemConfig.getString(Constants.GEARPUMP_SERIALIZER_POOL)) - val pool = clazz.newInstance().asInstanceOf[SerializationFramework] - pool.init(system, userConf) - pool.asInstanceOf[SerializationFramework] - } - - private def unHandled(state: String): Receive = { - case other => - LOG.info(s"Received unknown message $other in state: $state") - } - - private def box(receive: Receive): Receive = { - executorService orElse receive orElse unHandled(state) - } -} - -object Executor { - case class RestartTasks(dagVersion: Int) - - class TaskArgumentStore { - - private var store = Map.empty[TaskId, List[TaskArgument]] - - def add(taskId: TaskId, task: TaskArgument): Unit = { - val list = store.getOrElse(taskId, List.empty[TaskArgument]) - store += taskId -> (task :: list) - } - - def get(dagVersion: Int, taskId: TaskId): Option[TaskArgument] = { - store.get(taskId).flatMap { list => - list.find { arg => - arg.dagVersion <= dagVersion - } - } - } - - /** - * When the new DAG is successfully deployed, then we should remove obsolete - * TaskArgument of old DAG. - */ - def removeObsoleteVersion(): Unit = { - store = store.map { kv => - val (k, list) = kv - (k, list.take(1)) - } - } - - def removeNewerVersion(currentVersion: Int): Unit = { - store = store.map { kv => - val (k, list) = kv - (k, list.filter(_.dagVersion <= currentVersion)) - } - } - } - - case class TaskStopped(task: ActorRef) - - case class ExecutorSummary( - id: Int, - workerId: WorkerId, - actorPath: String, - logFile: String, - status: String, - taskCount: Int, - tasks: Map[ProcessorId, List[TaskId]], - jvmName: String - ) - - object ExecutorSummary { - def empty: ExecutorSummary = { - ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "") - } - } - - case class GetExecutorSummary(executorId: Int) - - case class QueryExecutorConfig(executorId: Int) - - case class ExecutorConfig(config: Config) - - class DagVersionMismatchException(msg: String) extends Exception(msg) - - object State { - val ACTIVE = "active" - val DYNAMIC_DAG_PHASE1 = "dynamic_dag_phase1" - val DYNAMIC_DAG_PHASE2 = "dynamic_dag_phase2" - val RECOVERY = "dag_recovery" - } - - object HealthCheck -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala deleted file mode 100644 index c40aa5f..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.executor - -import scala.collection.immutable -import scala.concurrent.duration.Duration - -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.RestartPolicy - -/** - * - * Controls how many retries to recover failed executors. - * - * @param maxNrOfRetries the number of times a executor is allowed to be restarted, - * negative value means no limit, if the limit is exceeded the policy - * will not allow to restart the executor - * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf - * means no window - */ -class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { - private var executorToTaskIds = Map.empty[Int, Set[TaskId]] - private var taskRestartPolocies = new immutable.HashMap[TaskId, RestartPolicy] - - def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = { - var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, Set.empty[TaskId]) - taskSetForExecutorId += taskId - executorToTaskIds += executorId -> taskSetForExecutorId - if (!taskRestartPolocies.contains(taskId)) { - taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, withinTimeRange) - } - } - - def allowRestartExecutor(executorId: Int): Boolean = { - executorToTaskIds.get(executorId).map { taskIds => - taskIds.foreach { taskId => - taskRestartPolocies.get(taskId).map { policy => - if (!policy.allowRestart) { - // scalastyle:off return - return false - // scalastyle:on return - } - } - } - } - true - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala deleted file mode 100644 index d377ea4..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.executor - -import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} - -import io.gearpump.cluster.{ExecutorContext, UserConfig} -import io.gearpump.serializer.SerializationFramework -import io.gearpump.streaming.ProcessorDescription -import io.gearpump.streaming.executor.TaskLauncher.TaskArgument -import io.gearpump.streaming.task._ -import io.gearpump.streaming.util.ActorPathUtil - -trait ITaskLauncher { - - /** Launch a list of task actors */ - def launch(taskIds: List[TaskId], argument: TaskArgument, - context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) - : Map[TaskId, ActorRef] -} - -class TaskLauncher( - appId: Int, - appName: String, - executorId: Int, - appMaster: ActorRef, - userConf: UserConfig, - taskActorClass: Class[_ <: Actor]) - extends ITaskLauncher{ - - override def launch( - taskIds: List[TaskId], argument: TaskArgument, - context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) - : Map[TaskId, ActorRef] = { - import argument.{processorDescription, subscribers} - - val taskConf = userConf.withConfig(processorDescription.taskConf) - - val taskContext = TaskContextData(executorId, - appId, appName, appMaster, - processorDescription.parallelism, - processorDescription.life, subscribers) - - val taskClass = TaskUtil.loadClass(processorDescription.taskClass) - - var tasks = Map.empty[TaskId, ActorRef] - taskIds.foreach { taskId => - val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf) - val taskActor = context.actorOf(Props(taskActorClass, taskId, taskContext, userConf, task, - serializer).withDispatcher(dispatcher), ActorPathUtil.taskActorName(taskId)) - tasks += taskId -> taskActor - } - tasks - } -} - -object TaskLauncher { - - case class TaskArgument( - dagVersion: Int, processorDescription: ProcessorDescription, - subscribers: List[Subscriber]) - - def apply(executorContext: ExecutorContext, userConf: UserConfig): TaskLauncher = { - import executorContext.{appId, appMaster, appName, executorId} - new TaskLauncher(appId, appName, executorId, appMaster, userConf, classOf[TaskActor]) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala deleted file mode 100644 index 8be2e72..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.metrics - -import java.util - -import com.typesafe.config.Config - -import io.gearpump.TimeStamp -import io.gearpump.cluster.ClientToMaster.ReadOption -import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.google.common.collect.Iterators -import io.gearpump.metrics.Metrics.{Histogram, Meter} -import io.gearpump.metrics.MetricsAggregator -import io.gearpump.streaming.metrics.ProcessorAggregator._ -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig - -/** - * - * Does aggregation on metrics after grouping by these three attributes: - * 1. processorId - * 2. time section(represented as a index integer) - * 3. metricName(like sendThroughput) - * - * It assumes that for each [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the name - * follow the format app(appId).processor(processorId).task(taskId).(metricName) - * - * It parses the name to get processorId and metricName. If the parsing fails, then current - * [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be skipped. - * - * NOTE: this class is optimized for performance. - */ -class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends MetricsAggregator { - - def this(config: Config) = { - this(HistoryMetricsConfig(config)) - } - - private val aggregatorFactory: AggregatorFactory = new AggregatorFactory() - - /** - * Accepts options: - * key: "readOption", value: one of "readLatest", "readRecent", "readHistory" - */ - override def aggregate(options: Map[String, String], - inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { - val readOption = options.get(ReadOption.Key).getOrElse(ReadOption.ReadLatest) - aggregate(readOption, inputs, System.currentTimeMillis()) - } - - def aggregate( - readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp) - : List[HistoryMetricsItem] = { - val (start, end, interval) = getTimeRange(readOption, now) - val timeSlotsCount = ((end - start - 1) / interval + 1).toInt - val map = new MultiLayerMap[Aggregator](timeSlotsCount) - - val taskIdentity = new TaskIdentity(0, null) - while (inputs.hasNext) { - val item = inputs.next() - - if (item.value.isInstanceOf[Meter] || item.value.isInstanceOf[Histogram]) { - if (item.time >= start && item.time < end) { - val timeIndex = ((item.time - start) / interval).toInt - - if (parseName(item.value.name, taskIdentity)) { - var op = map.get(timeIndex, taskIdentity.group) - if (op == null) { - op = aggregatorFactory.create(item, taskIdentity.group) - map.put(timeIndex, taskIdentity.group, op) - } - op.aggregate(item) - } - } - } - } - - val result = new Array[HistoryMetricsItem](map.size) - val iterator = map.valueIterator - var index = 0 - while (iterator.hasNext()) { - val op = iterator.next() - result(index) = op.result - index += 1 - } - - result.toList - } - - // Returns (start, end, interval) - private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp) - : (TimeStamp, TimeStamp, TimeStamp) = { - readOption match { - case ReadOption.ReadRecent => - val end = now - val start = end - (historyMetricConfig.retainRecentDataSeconds) * 1000 - val interval = historyMetricConfig.retainRecentDataIntervalMs - (floor(start, interval), floor(end, interval), interval) - case ReadOption.ReadHistory => - val end = now - val start = end - (historyMetricConfig.retainHistoryDataHours) * 3600 * 1000 - val interval = historyMetricConfig.retainHistoryDataIntervalMs - (floor(start, interval), floor(end, interval), interval) - case _ => - // All data points are aggregated together. - (0L, Long.MaxValue, Long.MaxValue) - } - } - - // The original metrics data is divided by interval points: - // time series (0, interval, 2*interval, 3*interval....) - // floor(..) make sure the Aggregator use the same set of interval points. - private def floor(value: Long, interval: Long): Long = { - (value / interval) * interval - } - - // Returns "app0.processor0:sendThroughput" as the group Id. - private def parseName(name: String, result: TaskIdentity): Boolean = { - val taskIndex = name.indexOf(TASK_TAG) - if (taskIndex > 0) { - val processor = name.substring(0, taskIndex) - val typeIndex = name.indexOf(":", taskIndex + 1) - if (typeIndex > 0) { - result.task = (name.substring(taskIndex + TASK_TAG.length, typeIndex)).toShort - val metricName = name.substring(typeIndex) - result.group = processor + metricName - true - } else { - false - } - } else { - false - } - } -} - -object ProcessorAggregator { - val readOption = ReadOption.Key - - private val TASK_TAG = ".task" - - private class TaskIdentity(var task: Short, var group: String) - - /** - * - * MultiLayerMap has multiple layers. For each layer, there - * is a hashMap. - * - * To access a value with get, user need to specify first layer Id, then key. - * - * This class is optimized for performance. - */ - class MultiLayerMap[Value](layers: Int) { - - private var _size: Int = 0 - private val map: Array[java.util.HashMap[String, Value]] = createMap(layers) - - /** - * @param key key in current layer - * @return return null if key is not found - */ - def get(layer: Int, key: String): Value = { - if (layer < layers) { - map(layer).get(key) - } else { - null.asInstanceOf[Value] - } - } - - def put(layer: Int, key: String, value: Value): Unit = { - if (layer < layers) { - map(layer).put(key, value) - _size += 1 - } - } - - def size: Int = _size - - def valueIterator: util.Iterator[Value] = { - val iterators = new Array[util.Iterator[Value]](layers) - var layer = 0 - while (layer < layers) { - iterators(layer) = map(layer).values().iterator() - layer += 1 - } - - Iterators.concat(iterators: _*) - } - - private def createMap(layers: Int) = { - val map = new Array[java.util.HashMap[String, Value]](layers) - var index = 0 - val length = map.length - while (index < length) { - map(index) = new java.util.HashMap[String, Value]() - index += 1 - } - map - } - } - - trait Aggregator { - def aggregate(item: HistoryMetricsItem): Unit - def result: HistoryMetricsItem - } - - class HistogramAggregator(name: String) extends Aggregator { - - var count: Long = 0 - var mean: Double = 0 - var stddev: Double = 0 - var median: Double = 0 - var p95: Double = 0 - var p99: Double = 0 - var p999: Double = 0 - - var startTime: TimeStamp = Long.MaxValue - - override def aggregate(item: HistoryMetricsItem): Unit = { - val input = item.value.asInstanceOf[Histogram] - count += 1 - mean += input.mean - stddev += input.stddev - median += input.median - p95 += input.p95 - p99 += input.p99 - p999 += input.p999 - - if (item.time < startTime) { - startTime = item.time - } - } - - override def result: HistoryMetricsItem = { - if (count > 0) { - HistoryMetricsItem(startTime, Histogram(name, mean / count, stddev / count, - median / count, p95 / count, p99 / count, p999 / count)) - } else { - HistoryMetricsItem(0, Histogram(name, 0, 0, 0, 0, 0, 0)) - } - } - } - - class MeterAggregator(name: String) extends Aggregator { - - var count: Long = 0 - var meanRate: Double = 0 - var m1: Double = 0 - var rateUnit: String = null - - var startTime: TimeStamp = Long.MaxValue - - override def aggregate(item: HistoryMetricsItem): Unit = { - - val input = item.value.asInstanceOf[Meter] - count += input.count - - meanRate += input.meanRate - m1 += input.m1 - - if (null == rateUnit) { - rateUnit = input.rateUnit - } - - if (item.time < startTime) { - startTime = item.time - } - } - - override def result: HistoryMetricsItem = { - HistoryMetricsItem(startTime, Meter(name, count, meanRate, - m1, rateUnit)) - } - } - - class AggregatorFactory { - def create(item: HistoryMetricsItem, name: String): Aggregator = { - item.value match { - case meter: Meter => new MeterAggregator(name) - case histogram: Histogram => new HistogramAggregator(name) - case _ => null // not supported - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala deleted file mode 100644 index dbf79ec..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.metrics - -import scala.collection.mutable.ListBuffer -import scala.util.{Failure, Success, Try} - -import com.typesafe.config.Config - -import io.gearpump.cluster.ClientToMaster.ReadOption -import io.gearpump.cluster.MasterToClient.HistoryMetricsItem -import io.gearpump.metrics.MetricsAggregator -import io.gearpump.util.{Constants, LogUtil} - -/** - * Filters the latest metrics data by specifying a - * processor Id range, and taskId range. - */ -class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator { - - import io.gearpump.streaming.metrics.TaskFilterAggregator._ - - def this(config: Config) = { - this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT)) - } - override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) - : List[HistoryMetricsItem] = { - - if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) { - // Returns empty set - List.empty[HistoryMetricsItem] - } else { - val parsed = Options.parse(options) - if (parsed != null) { - aggregate(parsed, inputs) - } else { - List.empty[HistoryMetricsItem] - } - } - } - - def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem]) - : List[HistoryMetricsItem] = { - - val result = new ListBuffer[HistoryMetricsItem] - val effectiveLimit = Math.min(options.limit, maxLimit) - var count = 0 - - val taskIdentity = new TaskIdentity(0, 0) - - while (inputs.hasNext && count < effectiveLimit) { - val item = inputs.next() - if (parseName(item.value.name, taskIdentity)) { - if (taskIdentity.processor >= options.startProcessor && - taskIdentity.processor < options.endProcessor && - taskIdentity.task >= options.startTask && - taskIdentity.task < options.endTask) { - result.prepend(item) - count += 1 - } - } - } - result.toList - } - - // Assume the name format is: "app0.processor0.task0:sendThroughput", returns - // (processorId, taskId) - // - // returns true if success - private def parseName(name: String, result: TaskIdentity): Boolean = { - val processorStart = name.indexOf(PROCESSOR_TAG) - if (processorStart != -1) { - val taskStart = name.indexOf(TASK_TAG, processorStart + 1) - if (taskStart != -1) { - val processorId = name.substring(processorStart, taskStart).substring(PROCESSOR_TAG.length) - .toInt - result.processor = processorId - val taskEnd = name.indexOf(":", taskStart + 1) - if (taskEnd != -1) { - val taskId = name.substring(taskStart, taskEnd).substring(TASK_TAG.length).toInt - result.task = taskId - true - } else { - false - } - } else { - false - } - } else { - false - } - } -} - -object TaskFilterAggregator { - val StartTask = "startTask" - val EndTask = "endTask" - val StartProcessor = "startProcessor" - val EndProcessor = "endProcessor" - val Limit = "limit" - - val TASK_TAG = ".task" - val PROCESSOR_TAG = ".processor" - - private class TaskIdentity(var processor: Int, var task: Int) - - case class Options( - limit: Int, startTask: Int, endTask: Int, startProcessor: Int, endProcessor: Int) - - private val LOG = LogUtil.getLogger(getClass) - - object Options { - - def acceptAll: Options = { - new Options(Int.MaxValue, 0, Int.MaxValue, 0, Int.MaxValue) - } - - def parse(options: Map[String, String]): Options = { - // Do sanity check - val optionTry = Try { - val startTask = options.get(StartTask).map(_.toInt).getOrElse(0) - val endTask = options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE) - val startProcessor = options.get(StartProcessor).map(_.toInt).getOrElse(0) - val endProcessor = options.get(EndProcessor).map(_.toInt).getOrElse(Integer.MAX_VALUE) - val limit = options.get(Limit).map(_.toInt).getOrElse(DEFAULT_LIMIT) - new Options(limit, startTask, endTask, startProcessor, endProcessor) - } - - optionTry match { - case Success(options) => options - case Failure(ex) => - LOG.error("Failed to parse the options in TaskFilterAggregator. Error msg: " + - ex.getMessage) - null - } - } - } - - val DEFAULT_LIMIT = 1000 - val MAX_LIMIT = 1000 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/package.scala b/streaming/src/main/scala/io/gearpump/streaming/package.scala deleted file mode 100644 index 95d51f0..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump - -package object streaming { - type ProcessorId = Int - type TaskIndex = Int - type ExecutorId = Int -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala deleted file mode 100644 index b036619..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.sink - -import io.gearpump.Message -import io.gearpump.streaming.task.TaskContext - -/** - * Interface to implement custom data sink where result of a DAG is typically written - * a DataSink could be a data store like HBase or simply a console - * - * An example would be like: - * {{{ - * class ConsoleSink extends DataSink[String] { - * - * def open(context: TaskContext): Unit = {} - * - * def write(s: String): Unit = { - * Console.println(s) - * } - * - * def close(): Unit = {} - * } - * }}} - * - * Subclass is required to be serializable - */ -trait DataSink extends java.io.Serializable { - - /** - * Opens connection to data sink - * invoked at onStart() method of [[io.gearpump.streaming.task.Task]] - * @param context is the task context at runtime - */ - def open(context: TaskContext): Unit - - /** - * Writes message into data sink - * invoked at onNext() method of [[io.gearpump.streaming.task.Task]] - * @param message wraps data to be written out - */ - def write(message: Message): Unit - - /** - * Closes connection to data sink - * invoked at onClose() method of [[io.gearpump.streaming.task.Task]] - */ - def close(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala deleted file mode 100644 index d753cc2..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.sink - -import akka.actor.ActorSystem - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.Processor - -/** - * Utility that helps user to create a DAG ending in [[DataSink]] - * user should pass in a [[DataSink]]. - * - * here is an example to build a DAG that does word count and write to KafkaSink - * {{{ - * val split = Processor[Split](1) - * val sum = Processor[Sum](1) - * val sink = new KafkaSink() - * val sinkProcessor = DataSinkProcessor(sink, 1) - * val dag = split ~> sum ~> sink - * }}} - */ -object DataSinkProcessor { - def apply( - dataSink: DataSink, - parallelism: Int, - description: String = "", - taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) - : Processor[DataSinkTask] = { - Processor[DataSinkTask](parallelism, description = description, - taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala deleted file mode 100644 index 7436617..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.sink - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -object DataSinkTask { - val DATA_SINK = "data_sink" -} - -/** - * General task that runs any [[DataSink]] - */ -class DataSinkTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import io.gearpump.streaming.sink.DataSinkTask._ - - private val sink = conf.getValue[DataSink](DATA_SINK).get - - override def onStart(startTime: StartTime): Unit = { - LOG.info("opening data sink...") - sink.open(context) - } - - override def onNext(message: Message): Unit = { - sink.write(message) - } - - override def onStop(): Unit = { - LOG.info("closing data sink...") - sink.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala deleted file mode 100644 index e145079..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -import io.gearpump.streaming.task.TaskContext -import io.gearpump.Message - -import scala.util.Random - -/** - * Interface to implement custom source where data is read into the system. - * a DataSource could be a message queue like kafka or simply data generation source. - * - * An example would be like - * {{{ - * GenMsgSource extends DataSource { - * - * def open(context: TaskContext, startTime: TimeStamp): Unit = {} - * - * def read(context: TaskContext): Message = { - * Message("message") - * } - * - * def close(): Unit = {} - * } - * }}} - * - * subclass is required to be serializable - */ -trait DataSource extends java.io.Serializable { - - /** - * Opens connection to data source - * invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]] - * - * @param context is the task context at runtime - * @param startTime is the start time of system - */ - def open(context: TaskContext, startTime: Long): Unit - - /** - * Reads next message from data source and - * returns null if no message is available - * - * @return a [[io.gearpump.Message]] or null - */ - def read(): Message - - /** - * Closes connection to data source. - * invoked in onStop() method of [[io.gearpump.streaming.source.DataSourceTask]] - */ - def close(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala deleted file mode 100644 index 6ca939f..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -object DataSourceConfig { - - val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size" - val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala deleted file mode 100644 index 384b86a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -import akka.actor.ActorSystem - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.Processor - -/** - * Utility that helps user to create a DAG starting with [[DataSourceTask]] - * user should pass in a [[DataSource]] - * - * Here is an example to build a DAG that reads from Kafka source followed by word count - * {{{ - * val source = new KafkaSource() - * val sourceProcessor = DataSourceProcessor(source, 1) - * val split = Processor[Split](1) - * val sum = Processor[Sum](1) - * val dag = sourceProcessor ~> split ~> sum - * }}} - */ -object DataSourceProcessor { - def apply( - dataSource: DataSource, - parallelism: Int, - description: String = "", - taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) - : Processor[DataSourceTask] = { - Processor[DataSourceTask](parallelism, description = description, - taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource)) - } -}
