http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala index 7fcaccf..ddd2037 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,55 +18,62 @@ package io.gearpump.streaming.dsl +import scala.language.implicitConversions + +import org.slf4j.{Logger, LoggerFactory} + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.dsl.op._ import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.{TaskContext, Task} -import io.gearpump.util.{Graph, LogUtil} -import org.slf4j.{Logger, LoggerFactory} +import io.gearpump.streaming.task.{Task, TaskContext} +import io.gearpump.util.Graph -class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, private val edge: Option[OpEdge] = None) { +class Stream[T]( + private val graph: Graph[Op, OpEdge], private val thisNode: Op, + private val edge: Option[OpEdge] = None) { /** - * convert a value[T] to a list of value[R] - * @param fun function - * @param description the description message for this operation - * @param <R> the result message type - * @return a new stream with type [R] + * converts a value[T] to a list of value[R] + * + * @param fun FlatMap function + * @param description The description message for this operation + * @return A new stream with type [R] */ def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) - graph.addVertex(flatMapOp ) + graph.addVertex(flatMapOp) graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) new Stream[R](graph, flatMapOp) } /** - * convert value[T] to value[R] - * @param fun function - * @param <R> the result message type - * @return a new stream with type [R] + * Maps message of type T message of type R + * + * @param fun Function + * @return A new stream with type [R] */ def map[R](fun: T => R, description: String = null): Stream[R] = { - this.flatMap ({ data => + this.flatMap({ data => Option(fun(data)) }, Option(description).getOrElse("map")) } /** - * reserve records when fun(T) == true + * Keeps records when fun(T) == true + * * @param fun the filter * @return a new stream after filter */ def filter(fun: T => Boolean, description: String = null): Stream[T] = { - this.flatMap ({ data => + this.flatMap({ data => if (fun(data)) Option(data) else None }, Option(description).getOrElse("filter")) } /** - * Reduce opeartion + * Reduces operations. + * * @param fun reduction function * @param description description message for this operator * @return a new stream after reduction @@ -86,7 +93,8 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, pr } /** - * Merge data from two stream into one + * Merges data from two stream into one + * * @param other the other stream * @return the merged stream */ @@ -99,7 +107,7 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, pr } /** - * Group by fun(T) + * Group by function (T => Group) * * For example, we have T type, People(name: String, gender: String, age: Int) * groupBy[People](_.gender) will group the people by gender. @@ -107,16 +115,17 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, pr * You can append other combinators after groupBy * * For example, - * + * {{{ * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) + * }}} * - * @param fun group by function - * @param parallelism parallelism level - * @param description the description - * @param <Group> the group type + * @param fun Group by function + * @param parallelism Parallelism level + * @param description The description * @return the grouped stream */ - def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null): Stream[T] = { + def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) + : Stream[T] = { val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) graph.addVertex(groupOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) @@ -124,33 +133,36 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, pr } /** - * connect with a low level Processor(TaskDescription) + * Connects with a low level Processor(TaskDescription) + * * @param processor a user defined processor * @param parallelism parallelism level - * @param <R> the result message type * @return new stream after processing with type [R] */ - def process[R](processor: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, description: String = null): Stream[R] = { - val processorOp = ProcessorOp(processor, parallism, conf, Option(description).getOrElse("process")) + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, + description: String = null): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, + Option(description).getOrElse("process")) graph.addVertex(processorOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) new Stream[R](graph, processorOp, Some(Shuffle)) } } -class KVStream[K, V](stream: Stream[Tuple2[K, V]]){ +class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { /** - * Apply to Stream[Tuple2[K,V]] - * Group by the key of a KV tuple - * For (key, value) will groupby key + * GroupBy key + * + * Applies to Stream[Tuple2[K,V]] + * * @param parallelism the parallelism for this operation * @return the new KV stream */ - def groupByKey(parallism: Int = 1): Stream[Tuple2[K, V]] = { - stream.groupBy(Stream.getTupleKey[K, V], parallism, "groupByKey") + def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { + stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") } - /** * Sum the value of the tuples * @@ -160,31 +172,39 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]){ * @param numeric the numeric operations * @return the sum stream */ - def sum(implicit numeric: Numeric[V]) = { + def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { stream.reduce(Stream.sumByValue[K, V](numeric), "sum") } } object Stream { - def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]) = new Stream[T](graph, node, edge) + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { + new Stream[T](graph, node, edge) + } def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) - implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = new KVStream(stream) + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { + new KVStream(stream) + } implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String): Stream[T] = { - implicit val sink = DataSinkOp[T](dataSink, parallism, conf, Some(description).getOrElse("traversable")) + def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { + implicit val sink = DataSinkOp[T](dataSink, parallism, conf, + Some(description).getOrElse("traversable")) stream.graph.addVertex(sink) stream.graph.addEdge(stream.thisNode, Shuffle, sink) new Stream[T](stream.graph, sink) } - def sink[T](sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, description: String = null): Stream[T] = { + def sink[T]( + sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, + description: String = null): Stream[T] = { val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) stream.graph.addVertex(sinkOp) stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) @@ -198,8 +218,7 @@ class LoggerSink[T] extends DataSink { private var context: TaskContext = null - - override def open(context: TaskContext) = { + override def open(context: TaskContext): Unit = { this.logger = context.logger } @@ -207,6 +226,5 @@ class LoggerSink[T] extends DataSink { logger.info("logging message " + message.msg) } - override def close() = Unit -} - + override def close(): Unit = Unit +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala index cfcfed2..525000d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,36 +18,40 @@ package io.gearpump.streaming.dsl -import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import scala.language.implicitConversions + +import akka.actor.ActorSystem + +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext import io.gearpump.streaming.StreamApplication -import io.gearpump.streaming.dsl.op.{Shuffle, ProcessorOp, DataSourceOp, OpEdge, Op} +import io.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} import io.gearpump.streaming.dsl.plan.Planner import io.gearpump.streaming.source.DataSource import io.gearpump.streaming.task.{Task, TaskContext} -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext import io.gearpump.util.Graph import io.gearpump.{Message, TimeStamp} /** * Example: + * {{{ + * val data = "This is a good start, bingo!! bingo!!" + * app.fromCollection(data.lines.toList). + * // word => (word, count) + * flatMap(line => line.split("[\\s]+")).map((_, 1)). + * // (word, count1), (word, count2) => (word, count1 + count2) + * groupBy(kv => kv._1).reduce(sum(_, _)) * - * - val data = "This is a good start, bingo!! bingo!!" - app.fromCollection(data.lines.toList). - // word => (word, count) - flatMap(line => line.split("[\\s]+")).map((_, 1)). - // (word, count1), (word, count2) => (word, count1 + count2) - groupBy(kv => kv._1).reduce(sum(_, _)) - - val appId = context.submit(app) - context.close() + * val appId = context.submit(app) + * context.close() + * }}} * * @param name name of app */ -class StreamApp(val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { +class StreamApp( + val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { - def this(name: String ,system: ActorSystem, userConfig: UserConfig) = { + def this(name: String, system: ActorSystem, userConfig: UserConfig) = { this(name, system, userConfig, Graph.empty[Op, OpEdge]) } @@ -60,7 +64,10 @@ class StreamApp(val name: String, system: ActorSystem, userConfig: UserConfig, v } object StreamApp { - def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) = new StreamApp(name, context.system, userConfig) + def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) + : StreamApp = { + new StreamApp(name, context.system, userConfig) + } implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { streamApp.plan @@ -80,7 +87,8 @@ object StreamApp { source(dataSource, parallism, conf, description = null) } - def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String): Stream[T] = { + def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) @@ -89,7 +97,8 @@ object StreamApp { this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description) } - def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String): Stream[T] = { + def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source")) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) @@ -97,6 +106,7 @@ object StreamApp { } } +/** A test message source which generated message sequence repeatedly. */ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { val list = seq.toList var index = 0 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala index 03bfa81..549cc6e 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,47 +18,60 @@ package io.gearpump.streaming.dsl.javaapi +import scala.collection.JavaConverters._ + import io.gearpump.cluster.UserConfig import io.gearpump.streaming.dsl.Stream import io.gearpump.streaming.javaapi.dsl.functions._ import io.gearpump.streaming.task.Task -import scala.collection.JavaConverters._ - /** * Java DSL */ class JavaStream[T](val stream: Stream[T]) { + /** FlatMap on stream */ def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.flatMap({t: T => fn(t).asScala}, description)) + new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) } + /** Map on stream */ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.map({t: T => fn(t)}, description)) + new JavaStream[R](stream.map({ t: T => fn(t) }, description)) } + /** Only keep the messages that FilterFunction returns true. */ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.filter({t: T => fn(t)}, description)) + new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) } + /** Does aggregation on the stream */ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.reduce({(t1: T, t2: T) => fn(t1, t2)}, description)) + new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) } def log(): Unit = { stream.log() } + /** Merges streams of same type together */ def merge(other: JavaStream[T], description: String): JavaStream[T] = { new JavaStream[T](stream.merge(other.stream, description)) } - def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String): JavaStream[T] = { + /** + * Group by a stream and turns it to a list of sub-streams. Operations chained after + * groupBy applies to sub-streams. + */ + def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) + : JavaStream[T] = { new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) } - def process[R](processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String): JavaStream[R] = { + /** Add a low level Processor to process messages */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) + : JavaStream[R] = { new JavaStream[R](stream.process(processor, parallelism, conf, description)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala index 0ad03cd..e39e054 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,31 +19,29 @@ package io.gearpump.streaming.dsl.javaapi import java.util.Collection +import scala.collection.JavaConverters._ import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} import io.gearpump.streaming.source.DataSource -import scala.collection.JavaConverters._ - class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { private val streamApp = StreamApp(name, context, userConfig) def source[T](collection: Collection[T], parallelism: Int, - conf: UserConfig, description: String): JavaStream[T] = { + conf: UserConfig, description: String): JavaStream[T] = { val dataSource = new CollectionDataSource(collection.asScala.toSeq) source(dataSource, parallelism, conf, description) } def source[T](dataSource: DataSource, parallelism: Int, - conf: UserConfig, description: String): JavaStream[T] = { + conf: UserConfig, description: String): JavaStream[T] = { new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) } def run(): Unit = { context.submit(streamApp) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala index e0b89ce..f0a86fa 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -34,27 +34,39 @@ sealed trait Op { /** * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP * "Attach" means running in same Actor. - * */ trait SlaveOp[T] extends Op -case class FlatMapOp[T, R](fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) extends SlaveOp[T] +case class FlatMapOp[T, R]( + fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) + extends SlaveOp[T] -case class ReduceOp[T](fun: (T, T) =>T, description: String, conf: UserConfig = UserConfig.empty) extends SlaveOp[T] +case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) + extends SlaveOp[T] trait MasterOp extends Op trait ParameterizedOp[T] extends MasterOp -case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) extends MasterOp +case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) + extends MasterOp -case class GroupByOp[T, R](fun: T => R, parallelism: Int, description: String, override val conf: UserConfig = UserConfig.empty) extends ParameterizedOp[T] +case class GroupByOp[T, R]( + fun: T => R, parallelism: Int, description: String, + override val conf: UserConfig = UserConfig.empty) + extends ParameterizedOp[T] -case class ProcessorOp[T <: Task](processor: Class[T], parallelism: Int, conf: UserConfig, description: String) extends ParameterizedOp[T] +case class ProcessorOp[T <: Task]( + processor: Class[T], parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] -case class DataSourceOp[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) extends ParameterizedOp[T] +case class DataSourceOp[T]( + dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] -case class DataSinkOp[T](dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) extends ParameterizedOp[T] +case class DataSinkOp[T]( + dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] /** * Contains operators which can be chained to single one. @@ -70,8 +82,8 @@ case class OpChain(ops: List[Op]) extends Op { def description: String = null override def conf: UserConfig = { - // the head's conf has priority - ops.reverse.foldLeft(UserConfig.empty){(conf, op) => + // The head's conf has priority + ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => conf.withConfig(op.conf) } } @@ -84,7 +96,6 @@ trait OpEdge * * For example, map, flatmap operation doesn't require network shuffle, we can use Direct * to represent the relation with upstream operators. - * */ case object Direct extends OpEdge @@ -93,7 +104,6 @@ case object Direct extends OpEdge * * For example, map, flatmap operation doesn't require network shuffle, we can use Direct * to represent the relation with upstream operators. - * */ case object Shuffle extends OpEdge http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala index 7dad9cc..b842c7b 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,13 +20,12 @@ package io.gearpump.streaming.dsl.partitioner import io.gearpump.Message import io.gearpump.partitioner.UnicastPartitioner -/** Partition messages by applying group by function first. - * - * @param groupBy - * First apply message with groupBy function, then pick the hashCode of the output to do the partitioning. - * You must define hashCode() for output type of groupBy function. + +/** + * Partition messages by applying group by function first. * * For example: + * {{{ * case class People(name: String, gender: String) * * object Test{ @@ -34,9 +33,13 @@ import io.gearpump.partitioner.UnicastPartitioner * val groupBy: (People => String) = people => people.gender * val partitioner = GroupByPartitioner(groupBy) * } + * }}} + * + * @param groupBy First apply message with groupBy function, then pick the hashCode of the output + * to do the partitioning. You must define hashCode() for output type of groupBy function. */ class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { - override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() (hashCode & Integer.MAX_VALUE) % partitionNum } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala index 400c9b1..f916124 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,25 @@ package io.gearpump.streaming.dsl.plan +import scala.collection.TraversableOnce + import akka.actor.ActorSystem -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.{Processor, Constants} -import io.gearpump.streaming.dsl.op._ -import io.gearpump.streaming.task.{StartTime, TaskContext, Task} +import org.slf4j.Logger + import io.gearpump._ import io.gearpump.cluster.UserConfig -import Constants._ -import Processor.DefaultProcessor -import OpTranslator._ +import io.gearpump.streaming.Constants._ +import io.gearpump.streaming.Processor +import io.gearpump.streaming.Processor.DefaultProcessor +import io.gearpump.streaming.dsl.op._ +import io.gearpump.streaming.dsl.plan.OpTranslator._ +import io.gearpump.streaming.sink.DataSink +import io.gearpump.streaming.source.DataSource +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.TraversableOnce /** - * Translate a OP to a TaskDescription + * Translates a OP to a TaskDescription */ class OpTranslator extends java.io.Serializable { val LOG: Logger = LogUtil.getLogger(getClass) @@ -55,7 +56,7 @@ class OpTranslator extends java.io.Serializable { Processor[SourceTask[Object, Object]](parallism, description = description + "." + func.description, userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@ GroupByOp(_, parallism, description, _) => + case groupby@GroupByOp(_, parallism, description, _) => Processor[GroupByTask[Object, Object, Object]](parallism, description = description + "." + func.description, userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) @@ -79,6 +80,8 @@ class OpTranslator extends java.io.Serializable { Processor[TransformTask[Object, Object]](1, description = func.description, taskConf = userConfig) + case chain: OpChain => + throw new RuntimeException("Not supposed to be called!") } } @@ -87,8 +90,12 @@ class OpTranslator extends java.io.Serializable { val totalFunction = ops.foldLeft(func) { (fun, op) => val opFunction = op match { - case flatmap: FlatMapOp[Object, Object] => new FlatMapFunction(flatmap.fun, flatmap.description) - case reduce: ReduceOp[Object] => new ReduceFunction(reduce.fun, reduce.description) + case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => + new FlatMapFunction(flatmap.fun, flatmap.description) + case reduce: ReduceOp[Object @unchecked] => + new ReduceFunction(reduce.fun, reduce.description) + case _ => + throw new RuntimeException("Not supposed to be called!") } fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) } @@ -107,18 +114,22 @@ object OpTranslator { def description: String } - class DummyInputFunction[T] extends SingleInputFunction[T, T]{ - override def andThen[OUTER](other: SingleInputFunction[T, OUTER]): SingleInputFunction[T, OUTER] = { + class DummyInputFunction[T] extends SingleInputFunction[T, T] { + override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) + : SingleInputFunction[T, OUTER] = { other } - //should never be called - override def process(value: T) = None + // Should never be called + override def process(value: T): TraversableOnce[T] = None override def description: String = "" } - class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] { + class AndThen[IN, MIDDLE, OUT]( + first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) + extends SingleInputFunction[IN, OUT] { + override def process(value: IN): TraversableOnce[OUT] = { first.process(value).flatMap(second.process(_)) } @@ -130,7 +141,9 @@ object OpTranslator { } } - class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) extends SingleInputFunction[IN, OUT] { + class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) + extends SingleInputFunction[IN, OUT] { + override def process(value: IN): TraversableOnce[OUT] = { fun(value) } @@ -140,7 +153,9 @@ object OpTranslator { } } - class ReduceFunction[T](fun: (T, T)=>T, descriptionMessage: String) extends SingleInputFunction[T, T] { + class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) + extends SingleInputFunction[T, T] { + private var state: Any = null override def process(value: T): TraversableOnce[T] = { @@ -155,10 +170,13 @@ object OpTranslator { override def description: String = descriptionMessage } - class GroupByTask[IN, GROUP, OUT](groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + class GroupByTask[IN, GROUP, OUT]( + groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[GroupByOp[IN, GROUP]](GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, + this(userConf.getValue[GroupByOp[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, taskContext, userConf) } @@ -172,24 +190,29 @@ object OpTranslator { val group = groupBy(msg.msg.asInstanceOf[IN]) if (!groups.contains(group)) { - val operator = userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + val operator = + userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get groups += group -> operator } val operator = groups(group) - operator.process(msg.msg.asInstanceOf[IN]).foreach{msg => + operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) } } } - class SourceTask[T, OUT](source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + class SourceTask[T, OUT]( + source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, + userConf: UserConfig) + extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { this( userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get, - userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)(taskContext.system), + userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)( + taskContext.system), taskContext, userConf) } @@ -200,7 +223,7 @@ object OpTranslator { override def onNext(msg: Message): Unit = { val time = System.currentTimeMillis() - //Todo: determine the batch size + // TODO: determine the batch size source.read(1).foreach(msg => { operator match { case Some(operator) => @@ -219,15 +242,18 @@ object OpTranslator { self ! Message("next", System.currentTimeMillis()) } - override def onStop() = { + override def onStop(): Unit = { source.close() } } - class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + class TransformTask[IN, OUT]( + operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, + userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + this(userConf.getValue[SingleInputFunction[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) } override def onStart(startTime: StartTime): Unit = { @@ -238,7 +264,7 @@ object OpTranslator { operator match { case Some(operator) => - operator.process(msg.msg.asInstanceOf[IN]).foreach{ msg => + operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) } case None => @@ -247,9 +273,12 @@ object OpTranslator { } } - class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get, taskContext, userConf) + this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get, + taskContext, userConf) } override def onStart(startTime: StartTime): Unit = { @@ -260,7 +289,7 @@ object OpTranslator { dataSink.write(msg) } - override def onStop() = { + override def onStop(): Unit = { dataSink.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala index ee0818d..aafd8d3 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,35 +19,38 @@ package io.gearpump.streaming.dsl.plan import akka.actor.ActorSystem + +import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import io.gearpump.streaming.Processor import io.gearpump.streaming.dsl.op._ import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner import io.gearpump.streaming.task.Task -import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import io.gearpump.util.Graph class Planner { /* - * Convert Dag[Op] to Dag[TaskDescription] so that we can run it easily. + * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low + * level Graph API. */ - def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { + def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) + : Graph[Processor[_ <: Task], _ <: Partitioner] = { val opTranslator = new OpTranslator() val newDag = optimize(dag) - newDag.mapEdge {(node1, edge, node2) => + newDag.mapEdge { (node1, edge, node2) => edge match { case Shuffle => node2.head match { - case groupBy: GroupByOp[Any, Any] => + case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => new GroupByPartitioner(groupBy.fun) case _ => new HashPartitioner } case Direct => new CoLocationPartitioner } - }.mapVertex {opChain => + }.mapVertex { opChain => opTranslator.translate(opChain) } } @@ -65,11 +68,12 @@ class Planner { newGraph } - private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain): Graph[OpChain, OpEdge] = { + private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) + : Graph[OpChain, OpEdge] = { if (dag.outDegreeOf(node1) == 1 && dag.inDegreeOf(node2) == 1 && - // for processor node, we don't allow it to merge with downstream operators - !node1.head.isInstanceOf[ProcessorOp[_<:Task]]) { + // For processor node, we don't allow it to merge with downstream operators + !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { val (_, edge, _) = dag.outgoingEdgesOf(node1)(0) if (edge == Direct) { val opList = OpChain(node1.ops ++ node2.ops) @@ -82,7 +86,7 @@ class Planner { dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) } - //remove the old vertex + // Remove the old vertex dag.removeVertex(node1) dag.removeVertex(node2) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala index cbce65b..48007d5 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,17 +19,21 @@ package io.gearpump.streaming.executor import java.lang.management.ManagementFactory +import scala.concurrent.duration._ import akka.actor.SupervisorStrategy.Resume import akka.actor._ import com.typesafe.config.Config -import io.gearpump.WorkerId +import org.apache.commons.lang.exception.ExceptionUtils +import org.slf4j.Logger + +import io.gearpump.cluster.worker.WorkerId import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig} import io.gearpump.metrics.Metrics.ReportMetrics import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import io.gearpump.serializer.SerializationFramework import io.gearpump.streaming.AppMasterToExecutor.{MsgLostException, TasksChanged, TasksLaunched, _} -import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterExecutor, RegisterTask} +import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.executor.Executor._ import io.gearpump.streaming.executor.TaskLauncher.TaskArgument @@ -37,11 +41,6 @@ import io.gearpump.streaming.task.{Subscriber, TaskId} import io.gearpump.transport.{Express, HostPort} import io.gearpump.util.Constants._ import io.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler} -import org.apache.commons.lang.exception.ExceptionUtils -import org.slf4j.Logger - -import scala.concurrent.duration._ -import scala.language.postfixOps /** * Executor is child of AppMaster. @@ -64,18 +63,20 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) - implicit val timeOut = FUTURE_TIMEOUT + private implicit val timeOut = FUTURE_TIMEOUT private val address = ActorUtil.getFullPath(context.system, self.path) private val systemConfig = context.system.settings.config private val serializerPool = getSerializerPool() private val taskDispatcher = systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER) private var state = State.ACTIVE - private var transitionStart = 0L // state transition start, in unix time - private var transitionEnd = 0L // state transition end, in unix time + private var transitionStart = 0L + // States transition start, in unix time + private var transitionEnd = 0L + // States transition end, in unix time private val transitWarningThreshold = 5000 // ms, - // start health check Ticks + // Starts health check Ticks self ! HealthCheck LOG.info(s"Executor $executorId has been started, start to register itself...") @@ -92,31 +93,34 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) if (metricsEnabled) { - // register jvm metrics + // Registers jvm metrics Metrics(context.system).register(new JvmMetricsSet(s"app$appId.executor$executorId")) - val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system)))) + val metricsReportService = context.actorOf(Props(new MetricsReporterService( + Metrics(context.system)))) appMaster.tell(ReportMetrics, metricsReportService) } private val NOT_INITIALIZED = -1 - def receive : Receive = applicationReady(dagVersion = NOT_INITIALIZED) + def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED) private def getTaskId(actorRef: ActorRef): Option[TaskId] = { tasks.find(_._2 == actorRef).map(_._1) } override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { case _: MsgLostException => val taskId = getTaskId(sender) - val cause = s"We got MessageLossException from task ${getTaskId(sender)}, replaying application..." + val cause = s"We got MessageLossException from task ${getTaskId(sender)}, " + + s"replaying application..." LOG.error(cause) - taskId.foreach(appMaster ! MessageLoss(executorId, _, cause)) + taskId.foreach(appMaster ! MessageLoss(executorId, _, cause)) Resume case ex: Throwable => val taskId = getTaskId(sender) - val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will treat it as MessageLoss, so that the system will replay all lost message" + val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will treat it as" + + s" MessageLoss, so that the system will replay all lost message" LOG.error(errorMsg, ex) val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex) taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg)) @@ -129,22 +133,27 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit = { if (expectVersion != version) { - val errorMessage = s"Version mismatch: we expect dag version $expectVersion, but get $version; clue: $clue" + val errorMessage = s"Version mismatch: we expect dag version $expectVersion, " + + s"but get $version; clue: $clue" LOG.error(errorMessage) throw new DagVersionMismatchException(errorMessage) } } - def dynamicDagPhase1(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], registered: List[TaskId]): Receive = { + def dynamicDagPhase1( + dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], registered: List[TaskId]) + : Receive = { state = State.DYNAMIC_DAG_PHASE1 box({ - case launch@LaunchTasks(taskIds, version, processorDescription, subscribers: List[Subscriber]) => { + case launch@LaunchTasks(taskIds, version, processorDescription, + subscribers: List[Subscriber]) => { assertVersion(dagVersion, version, clue = launch) LOG.info(s"Launching Task $taskIds for app: $appId") val taskArgument = TaskArgument(version, processorDescription, subscribers) taskIds.foreach(taskArgumentStore.add(_, taskArgument)) - val newAdded = launcher.launch(taskIds, taskArgument, context, serializerPool, taskDispatcher) + val newAdded = launcher.launch(taskIds, taskArgument, context, serializerPool, + taskDispatcher) newAdded.foreach { newAddedTask => context.watch(newAddedTask._2) } @@ -160,12 +169,14 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher val newChangedTasks = taskIds.map { taskId => for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) { val processorDescription = taskArgument.processorDescription.copy(life = life) - taskArgumentStore.add(taskId, TaskArgument(dagVersion, processorDescription, subscribers)) + taskArgumentStore.add(taskId, TaskArgument(dagVersion, processorDescription, + subscribers)) } ChangeTask(taskId, dagVersion, life, subscribers) } sender ! TasksChanged(taskIds) - context.become(dynamicDagPhase1(dagVersion, launched, changed ++ newChangedTasks, registered)) + context.become(dynamicDagPhase1(dagVersion, launched, changed ++ newChangedTasks, + registered)) case locations@TaskLocationsReady(taskLocations, version) => LOG.info(s"TaskLocations Ready...") @@ -174,7 +185,9 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher // Check whether all tasks has been registered. if ((launched.toSet -- registered.toSet).isEmpty) { // Confirm all tasks has been registered. - val result = taskLocations.locations.filter(location => !location._1.equals(express.localHost)).flatMap { kv => + val result = taskLocations.locations.filter { + location => !location._1.equals(express.localHost) + }.flatMap { kv => val (host, taskIdList) = kv taskIdList.map(taskId => (TaskId.toLong(taskId), host)) } @@ -189,14 +202,17 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher } context.become(dynamicDagPhase2(dagVersion, launched, changed)) } else { - LOG.error("Inconsistency between AppMaser and Executor! AppMaster thinks DynamicDag transition is ready, " + - "while Executor have not get all tasks registered, that task will not be functional...") - //reject TaskLocations... + LOG.error("Inconsistency between AppMaser and Executor! AppMaster thinks DynamicDag " + + "transition is ready, while Executor have not get all tasks registered, " + + "that task will not be functional...") + + // Reject TaskLocations... val missedTasks = (launched.toSet -- registered.toSet).toList - val errorMsg = "We have not received TaskRegistered for following tasks: " + missedTasks.mkString(", ") + val errorMsg = "We have not received TaskRegistered for following tasks: " + + missedTasks.mkString(", ") LOG.error(errorMsg) sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, null) - // stay with current status... + // Stays with current status... } case confirm: TaskRegistered => @@ -205,10 +221,11 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher tasks += confirm.taskId -> actorRef actorRef forward confirm } - context.become(dynamicDagPhase1(dagVersion, launched, changed, registered :+ confirm.taskId)) + context.become(dynamicDagPhase1(dagVersion, launched, changed, + registered :+ confirm.taskId)) case rejected: TaskRejected => - // means this task shoud not exists... + // Means this task shoud not exists... tasks.get(rejected.taskId).foreach(_ ! PoisonPill) tasks -= rejected.taskId LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting down it...") @@ -218,7 +235,8 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher }) } - def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask]): Receive = { + def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask]) + : Receive = { LOG.info("Transit to dynamic Dag Phase2") state = State.DYNAMIC_DAG_PHASE2 box { @@ -240,29 +258,36 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher transitionEnd = System.currentTimeMillis() if (dagVersion != NOT_INITIALIZED) { - LOG.info("Transit to state Application Ready. This transition takes " + (transitionEnd - transitionStart) + " milliseconds") + LOG.info("Transit to state Application Ready. This transition takes " + + (transitionEnd - transitionStart) + " milliseconds") } box { case start: StartDynamicDag => LOG.info("received StartDynamicDag") if (start.dagVersion > dagVersion) { transitionStart = System.currentTimeMillis() - LOG.info(s"received $start, Executor transit to dag version: ${start.dagVersion} from current version $dagVersion") - context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId])) + LOG.info(s"received $start, Executor transit to dag version: ${start.dagVersion} from " + + s"current version $dagVersion") + context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) } case launch: LaunchTasks => if (launch.dagVersion > dagVersion) { transitionStart = System.currentTimeMillis() - LOG.info(s"received $launch, Executor transit to dag version: ${launch.dagVersion} from current version $dagVersion") - context.become(dynamicDagPhase1(launch.dagVersion, List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId])) + LOG.info(s"received $launch, Executor transit to dag " + + s"version: ${launch.dagVersion} from current version $dagVersion") + context.become(dynamicDagPhase1(launch.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) self forward launch } case change: ChangeTasks => if (change.dagVersion > dagVersion) { transitionStart = System.currentTimeMillis() - LOG.info(s"received $change, Executor transit to dag version: ${change.dagVersion} from current version $dagVersion") - context.become(dynamicDagPhase1(change.dagVersion, List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId])) + LOG.info(s"received $change, Executor transit to dag version: ${change.dagVersion} from" + + s" current version $dagVersion") + context.become(dynamicDagPhase1(change.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) self forward change } @@ -274,8 +299,8 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher } tasks -= taskId - case unRegister @ UnRegisterTask(taskId, _) => - // send UnRegisterTask to AppMaster + case unRegister@UnRegisterTask(taskId, _) => + // Sends UnRegisterTask to AppMaster appMaster ! unRegister } } @@ -289,14 +314,15 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher val newNeedRestart = needRestart :+ taskId val newRemain = remain - 1 if (newRemain == 0) { - val newRestarted = newNeedRestart.map{ taskId_ => + val newRestarted = newNeedRestart.map { taskId_ => val taskActor = launchTask(taskId_, taskArgumentStore.get(dagVersion, taskId_).get) context.watch(taskActor) taskId_ -> taskActor }.toMap tasks = newRestarted - context.become(dynamicDagPhase1(dagVersion, newNeedRestart, List.empty[ChangeTask], List.empty[TaskId])) + context.become(dynamicDagPhase1(dagVersion, newNeedRestart, List.empty[ChangeTask], + List.empty[TaskId])) } else { context.become(restartingTasks(dagVersion, newRemain, newNeedRestart)) } @@ -308,7 +334,8 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher val terminationWatch: Receive = { case Terminated(actor) => if (actor.compareTo(appMaster) == 0) { - LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, shutting down current executor $appId, $executorId") + LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, shutting down current " + + s"executor $appId, $executorId") context.stop(self) } else { self ! TaskStopped(actor) @@ -320,7 +347,8 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher LOG.info(s"Executor received restart tasks") val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, _).nonEmpty) express.remoteAddressMap.send(Map.empty[Long, HostPort]) - context.become(restartingTasks(dagVersion, remain = tasksToRestart, needRestart = List.empty[TaskId])) + context.become(restartingTasks(dagVersion, remain = tasksToRestart, + needRestart = List.empty[TaskId])) tasks.values.foreach { case task: ActorRef => task ! PoisonPill @@ -329,7 +357,7 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher def executorService: Receive = terminationWatch orElse onRestartTasks orElse { case taskChanged: TaskChanged => - //skip + // Skip case get: GetExecutorSummary => val logFile = LogUtil.applicationLogDir(systemConfig) val processorTasks = tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force @@ -346,7 +374,7 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher case query: QueryExecutorConfig => sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) case HealthCheck => - context.system.scheduler.scheduleOnce(3 second)(HealthCheck) + context.system.scheduler.scheduleOnce(3.second)(HealthCheck) if (state != State.ACTIVE && (transitionEnd - transitionStart) > transitWarningThreshold) { LOG.error(s"Executor status: " + state + s", it takes too long(${transitionEnd - transitionStart}) to do transition") @@ -392,17 +420,18 @@ object Executor { } /** - * when the new DAG is successfully deployed, then we should remove obsolete TaskArgument of old DAG. + * When the new DAG is successfully deployed, then we should remove obsolete + * TaskArgument of old DAG. */ - def removeObsoleteVersion: Unit = { - store = store.map{ kv => + def removeObsoleteVersion(): Unit = { + store = store.map { kv => val (k, list) = kv (k, list.take(1)) } } def removeNewerVersion(currentVersion: Int): Unit = { - store = store.map{ kv => + store = store.map { kv => val (k, list) = kv (k, list.filter(_.dagVersion <= currentVersion)) } @@ -412,18 +441,20 @@ object Executor { case class TaskStopped(task: ActorRef) case class ExecutorSummary( - id: Int, - workerId: WorkerId, - actorPath: String, - logFile: String, - status: String, - taskCount: Int, - tasks: Map[ProcessorId, List[TaskId]], - jvmName: String + id: Int, + workerId: WorkerId, + actorPath: String, + logFile: String, + status: String, + taskCount: Int, + tasks: Map[ProcessorId, List[TaskId]], + jvmName: String ) object ExecutorSummary { - def empty: ExecutorSummary = ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "") + def empty: ExecutorSummary = { + ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "") + } } case class GetExecutorSummary(executorId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala index e485d56..c40aa5f 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala @@ -15,18 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.streaming.executor -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.RestartPolicy +package io.gearpump.streaming.executor import scala.collection.immutable import scala.concurrent.duration.Duration +import io.gearpump.streaming.task.TaskId +import io.gearpump.util.RestartPolicy + /** - * @param maxNrOfRetries the number of times a executor is allowed to be restarted, negative value means no limit, - * if the limit is exceeded the policy will not allow to restart the executor - * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window + * + * Controls how many retries to recover failed executors. + * + * @param maxNrOfRetries the number of times a executor is allowed to be restarted, + * negative value means no limit, if the limit is exceeded the policy + * will not allow to restart the executor + * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf + * means no window */ class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { private var executorToTaskIds = Map.empty[Int, Set[TaskId]] @@ -45,12 +51,14 @@ class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { executorToTaskIds.get(executorId).map { taskIds => taskIds.foreach { taskId => taskRestartPolocies.get(taskId).map { policy => - if(!policy.allowRestart) { + if (!policy.allowRestart) { + // scalastyle:off return return false + // scalastyle:on return } } } } true } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala index b742889..d377ea4 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,10 +16,10 @@ * limitations under the License. */ - package io.gearpump.streaming.executor import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} + import io.gearpump.cluster.{ExecutorContext, UserConfig} import io.gearpump.serializer.SerializationFramework import io.gearpump.streaming.ProcessorDescription @@ -28,8 +28,11 @@ import io.gearpump.streaming.task._ import io.gearpump.streaming.util.ActorPathUtil trait ITaskLauncher { + + /** Launch a list of task actors */ def launch(taskIds: List[TaskId], argument: TaskArgument, - context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String): Map[TaskId, ActorRef] + context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) + : Map[TaskId, ActorRef] } class TaskLauncher( @@ -41,8 +44,10 @@ class TaskLauncher( taskActorClass: Class[_ <: Actor]) extends ITaskLauncher{ - override def launch(taskIds: List[TaskId], argument: TaskArgument, - context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String): Map[TaskId, ActorRef] = { + override def launch( + taskIds: List[TaskId], argument: TaskArgument, + context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) + : Map[TaskId, ActorRef] = { import argument.{processorDescription, subscribers} val taskConf = userConf.withConfig(processorDescription.taskConf) @@ -57,8 +62,8 @@ class TaskLauncher( var tasks = Map.empty[TaskId, ActorRef] taskIds.foreach { taskId => val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf) - val taskActor = context.actorOf(Props(taskActorClass, taskId, taskContext, userConf, task, serializer). - withDispatcher(dispatcher), ActorPathUtil.taskActorName(taskId)) + val taskActor = context.actorOf(Props(taskActorClass, taskId, taskContext, userConf, task, + serializer).withDispatcher(dispatcher), ActorPathUtil.taskActorName(taskId)) tasks += taskId -> taskActor } tasks @@ -67,7 +72,9 @@ class TaskLauncher( object TaskLauncher { - case class TaskArgument(dagVersion: Int, processorDescription: ProcessorDescription, subscribers: List[Subscriber]) + case class TaskArgument( + dagVersion: Int, processorDescription: ProcessorDescription, + subscribers: List[Subscriber]) def apply(executorContext: ExecutorContext, userConf: UserConfig): TaskLauncher = { import executorContext.{appId, appMaster, appName, executorId} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala index f92970c..8be2e72 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ package io.gearpump.streaming.metrics import java.util import com.typesafe.config.Config + import io.gearpump.TimeStamp import io.gearpump.cluster.ClientToMaster.ReadOption import io.gearpump.cluster.MasterToClient.HistoryMetricsItem @@ -32,24 +33,20 @@ import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig /** * - * [[ProcessorAggregator]] does aggregation after grouping by these three attributes: - * 1. processorId - * 2. time section(represented as a index integer) - * 3. metricName(like sendThroughput) - * - * - * It assumes for each [[HistoryMetricsItem]], the name follow the format - * app[appId].processor[processorId].task[taskId].[metricName] + * Does aggregation on metrics after grouping by these three attributes: + * 1. processorId + * 2. time section(represented as a index integer) + * 3. metricName(like sendThroughput) * + * It assumes that for each [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the name + * follow the format app(appId).processor(processorId).task(taskId).(metricName) * * It parses the name to get processorId and metricName. If the parsing fails, then current - * [[HistoryMetricsItem]] will be skipped. - * - * - * This class is optimized for performance. + * [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be skipped. * + * NOTE: this class is optimized for performance. */ -class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends MetricsAggregator{ +class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends MetricsAggregator { def this(config: Config) = { this(HistoryMetricsConfig(config)) @@ -58,9 +55,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met private val aggregatorFactory: AggregatorFactory = new AggregatorFactory() /** - * Accept options: + * Accepts options: * key: "readOption", value: one of "readLatest", "readRecent", "readHistory" - * */ override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { @@ -68,14 +64,15 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met aggregate(readOption, inputs, System.currentTimeMillis()) } - def aggregate(readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp): - List[HistoryMetricsItem] = { + def aggregate( + readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp) + : List[HistoryMetricsItem] = { val (start, end, interval) = getTimeRange(readOption, now) val timeSlotsCount = ((end - start - 1) / interval + 1).toInt val map = new MultiLayerMap[Aggregator](timeSlotsCount) val taskIdentity = new TaskIdentity(0, null) - while(inputs.hasNext) { + while (inputs.hasNext) { val item = inputs.next() if (item.value.isInstanceOf[Meter] || item.value.isInstanceOf[Histogram]) { @@ -97,7 +94,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met val result = new Array[HistoryMetricsItem](map.size) val iterator = map.valueIterator var index = 0 - while(iterator.hasNext()) { + while (iterator.hasNext()) { val op = iterator.next() result(index) = op.result index += 1 @@ -106,8 +103,9 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met result.toList } - // return (start, end, interval) - private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp): (TimeStamp, TimeStamp, TimeStamp) = { + // Returns (start, end, interval) + private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp) + : (TimeStamp, TimeStamp, TimeStamp) = { readOption match { case ReadOption.ReadRecent => val end = now @@ -120,7 +118,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met val interval = historyMetricConfig.retainHistoryDataIntervalMs (floor(start, interval), floor(end, interval), interval) case _ => - // all data points are aggregated together. + // All data points are aggregated together. (0L, Long.MaxValue, Long.MaxValue) } } @@ -132,7 +130,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met (value / interval) * interval } - // returns "app0.processor0:sendThroughput" as the group Id. + // Returns "app0.processor0:sendThroughput" as the group Id. private def parseName(name: String, result: TaskIdentity): Boolean = { val taskIndex = name.indexOf(TASK_TAG) if (taskIndex > 0) { @@ -157,20 +155,16 @@ object ProcessorAggregator { private val TASK_TAG = ".task" - private class TaskIdentity(var task: Short, var group: String) - /** * * MultiLayerMap has multiple layers. For each layer, there * is a hashMap. * - * To access a value with [[get]], user need to specify first layer Id, then key. - * + * To access a value with get, user need to specify first layer Id, then key. * * This class is optimized for performance. - * */ class MultiLayerMap[Value](layers: Int) { @@ -197,11 +191,11 @@ object ProcessorAggregator { } def size: Int = _size - + def valueIterator: util.Iterator[Value] = { val iterators = new Array[util.Iterator[Value]](layers) var layer = 0 - while(layer < layers) { + while (layer < layers) { iterators(layer) = map(layer).values().iterator() layer += 1 } @@ -213,7 +207,7 @@ object ProcessorAggregator { val map = new Array[java.util.HashMap[String, Value]](layers) var index = 0 val length = map.length - while(index < length) { + while (index < length) { map(index) = new java.util.HashMap[String, Value]() index += 1 } @@ -291,11 +285,11 @@ object ProcessorAggregator { override def result: HistoryMetricsItem = { HistoryMetricsItem(startTime, Meter(name, count, meanRate, - m1, rateUnit)) + m1, rateUnit)) } } - class AggregatorFactory{ + class AggregatorFactory { def create(item: HistoryMetricsItem, name: String): Aggregator = { item.value match { case meter: Meter => new MeterAggregator(name) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala index a9bc167..dbf79ec 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala @@ -1,28 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.streaming.metrics +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + import com.typesafe.config.Config + import io.gearpump.cluster.ClientToMaster.ReadOption import io.gearpump.cluster.MasterToClient.HistoryMetricsItem import io.gearpump.metrics.MetricsAggregator -import io.gearpump.util.{LogUtil, Constants} - -import scala.collection.mutable.ListBuffer -import scala.util.{Failure, Success, Try} +import io.gearpump.util.{Constants, LogUtil} /** - * It filters the latest metrics data by specifying a + * Filters the latest metrics data by specifying a * processor Id range, and taskId range. */ -class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator { +class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator { - import TaskFilterAggregator._ + import io.gearpump.streaming.metrics.TaskFilterAggregator._ def this(config: Config) = { this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT)) } - override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { + override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) { - // return empty set + // Returns empty set List.empty[HistoryMetricsItem] } else { val parsed = Options.parse(options) @@ -34,7 +55,8 @@ class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator { } } - def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { + def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { val result = new ListBuffer[HistoryMetricsItem] val effectiveLimit = Math.min(options.limit, maxLimit) @@ -42,7 +64,7 @@ class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator { val taskIdentity = new TaskIdentity(0, 0) - while(inputs.hasNext && count < effectiveLimit) { + while (inputs.hasNext && count < effectiveLimit) { val item = inputs.next() if (parseName(item.value.name, taskIdentity)) { if (taskIdentity.processor >= options.startProcessor && @@ -57,15 +79,17 @@ class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator { result.toList } - // Assume the name format is: "app0.processor0.task0:sendThroughput" - // return (processorId, taskId) - // return true if success + // Assume the name format is: "app0.processor0.task0:sendThroughput", returns + // (processorId, taskId) + // + // returns true if success private def parseName(name: String, result: TaskIdentity): Boolean = { val processorStart = name.indexOf(PROCESSOR_TAG) if (processorStart != -1) { val taskStart = name.indexOf(TASK_TAG, processorStart + 1) if (taskStart != -1) { - val processorId = name.substring(processorStart, taskStart).substring(PROCESSOR_TAG.length).toInt + val processorId = name.substring(processorStart, taskStart).substring(PROCESSOR_TAG.length) + .toInt result.processor = processorId val taskEnd = name.indexOf(":", taskStart + 1) if (taskEnd != -1) { @@ -84,7 +108,7 @@ class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator { } } -object TaskFilterAggregator{ +object TaskFilterAggregator { val StartTask = "startTask" val EndTask = "endTask" val StartProcessor = "startProcessor" @@ -96,7 +120,8 @@ object TaskFilterAggregator{ private class TaskIdentity(var processor: Int, var task: Int) - case class Options(limit: Int, startTask: Int, endTask: Int, startProcessor: Int, endProcessor: Int) + case class Options( + limit: Int, startTask: Int, endTask: Int, startProcessor: Int, endProcessor: Int) private val LOG = LogUtil.getLogger(getClass) @@ -107,7 +132,7 @@ object TaskFilterAggregator{ } def parse(options: Map[String, String]): Options = { - //do sanity check + // Do sanity check val optionTry = Try { val startTask = options.get(StartTask).map(_.toInt).getOrElse(0) val endTask = options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE) @@ -120,7 +145,8 @@ object TaskFilterAggregator{ optionTry match { case Success(options) => options case Failure(ex) => - LOG.error("Failed to parse the options in TaskFilterAggregator. Error msg: " + ex.getMessage) + LOG.error("Failed to parse the options in TaskFilterAggregator. Error msg: " + + ex.getMessage) null } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/package.scala b/streaming/src/main/scala/io/gearpump/streaming/package.scala index d255522..95d51f0 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/package.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/package.scala @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump package object streaming { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala index 3e062ed..b036619 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,12 +22,10 @@ import io.gearpump.Message import io.gearpump.streaming.task.TaskContext /** - * interface to implement custom data sink - * where result of a DAG is typically written - * + * Interface to implement custom data sink where result of a DAG is typically written * a DataSink could be a data store like HBase or simply a console * - * an example would be like + * An example would be like: * {{{ * class ConsoleSink extends DataSink[String] { * @@ -41,26 +39,26 @@ import io.gearpump.streaming.task.TaskContext * } * }}} * - * subclass is required to be serializable + * Subclass is required to be serializable */ trait DataSink extends java.io.Serializable { /** - * open connection to data sink + * Opens connection to data sink * invoked at onStart() method of [[io.gearpump.streaming.task.Task]] * @param context is the task context at runtime */ def open(context: TaskContext): Unit /** - * write message into data sink + * Writes message into data sink * invoked at onNext() method of [[io.gearpump.streaming.task.Task]] * @param message wraps data to be written out */ def write(message: Message): Unit /** - * close connection to data sink + * Closes connection to data sink * invoked at onClose() method of [[io.gearpump.streaming.task.Task]] */ def close(): Unit http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala index e83b40d..d753cc2 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,12 @@ package io.gearpump.streaming.sink import akka.actor.ActorSystem -import io.gearpump.streaming.Processor + import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.Processor /** - * utility that helps user to create a DAG ending in [[DataSink]] + * Utility that helps user to create a DAG ending in [[DataSink]] * user should pass in a [[DataSink]]. * * here is an example to build a DAG that does word count and write to KafkaSink @@ -36,10 +37,12 @@ import io.gearpump.cluster.UserConfig * }}} */ object DataSinkProcessor { - def apply(dataSink: DataSink, - parallelism: Int, - description: String = "", - taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem): Processor[DataSinkTask] = { + def apply( + dataSink: DataSink, + parallelism: Int, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSinkTask] = { Processor[DataSinkTask](parallelism, description = description, taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala index 95a6718..7436617 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,19 @@ package io.gearpump.streaming.sink -import io.gearpump.streaming.task.{Task, TaskContext, StartTime} import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} object DataSinkTask { val DATA_SINK = "data_sink" } /** - * general task that runs any [[DataSink]] + * General task that runs any [[DataSink]] */ -class DataSinkTask (context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import DataSinkTask._ +class DataSinkTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { + import io.gearpump.streaming.sink.DataSinkTask._ private val sink = conf.getValue[DataSink](DATA_SINK).get @@ -47,5 +47,4 @@ class DataSinkTask (context: TaskContext, conf: UserConfig) extends Task(context LOG.info("closing data sink...") sink.close() } - }
