http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala index bee81fb..9cb7ca0 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,13 +19,13 @@ package io.gearpump.streaming.source import io.gearpump.streaming.task.TaskContext -import io.gearpump.{TimeStamp, Message} +import io.gearpump.{Message, TimeStamp} /** - * interface to implement custom source where data is read into the system. + * Interface to implement custom source where data is read into the system. * a DataSource could be a message queue like kafka or simply data generation source. * - * an example would be like + * An example would be like * {{{ * GenStringSource extends DataSource { * @@ -44,7 +44,7 @@ import io.gearpump.{TimeStamp, Message} trait DataSource extends java.io.Serializable { /** - * open connection to data source + * Opens connection to data source * invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]] * @param context is the task context at runtime * @param startTime is the start time of system @@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable { def open(context: TaskContext, startTime: Option[TimeStamp]): Unit /** - * read a number of messages from data source. + * Reads a number of messages from data source. * invoked in each onNext() method of [[io.gearpump.streaming.source.DataSourceTask]] * @param batchSize max number of messages to read * @return a list of messages wrapped in [[io.gearpump.Message]] @@ -60,7 +60,7 @@ trait DataSource extends java.io.Serializable { def read(batchSize: Int): List[Message] /** - * close connection to data source. + * Closes connection to data source. * invoked in onStop() method of [[io.gearpump.streaming.source.DataSourceTask]] */ def close(): Unit
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala index e7c0599..6ca939f 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,5 +22,4 @@ object DataSourceConfig { val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size" val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class" - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala index 61eabcc..384b86a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,14 +19,15 @@ package io.gearpump.streaming.source import akka.actor.ActorSystem -import io.gearpump.streaming.Processor + import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.Processor /** - * utility that helps user to create a DAG starting with [[DataSourceTask]] + * Utility that helps user to create a DAG starting with [[DataSourceTask]] * user should pass in a [[DataSource]] * - * here is an example to build a DAG that reads from Kafka source followed by word count + * Here is an example to build a DAG that reads from Kafka source followed by word count * {{{ * val source = new KafkaSource() * val sourceProcessor = DataSourceProcessor(source, 1) @@ -36,10 +37,12 @@ import io.gearpump.cluster.UserConfig * }}} */ object DataSourceProcessor { - def apply(dataSource: DataSource, - parallelism: Int, - description: String = "", - taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem): Processor[DataSourceTask] = { + def apply( + dataSource: DataSource, + parallelism: Int, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSourceTask] = { Processor[DataSourceTask](parallelism, description = description, taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala index 4c65100..d9b2110 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,26 +18,27 @@ package io.gearpump.streaming.source -import io.gearpump.streaming.task.{Task, StartTime, TaskContext} import io.gearpump._ import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} object DataSourceTask { val DATA_SOURCE = "data_source" } /** - * general task that runs any [[DataSource]] - * see [[DataSourceProcessor]] for its usage + * Task container for [[io.gearpump.streaming.source.DataSource]]. + * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage * - * DataSourceTask calls - * - `DataSource.open` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] and application start time - * - `DataSource.read` in each `onNext`, which reads a batch of messages whose size are defined by - * `gearpump.source.read.batch.size`. - * - `DataSource.close` in `onStop` + * DataSourceTask calls: + * - `DataSource.open()` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] + * and application start time + * - `DataSource.read()` in each `onNext`, which reads a batch of messages whose size are + * defined by `gearpump.source.read.batch.size`. + * - `DataSource.close()` in `onStop` */ class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import DataSourceTask._ + import io.gearpump.streaming.source.DataSourceTask._ private val source = conf.getValue[DataSource](DATA_SOURCE).get private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala index a9ee674..b7d4e90 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,10 +19,10 @@ package io.gearpump.streaming.source import io.gearpump.streaming.transaction.api.TimeStampFilter -import io.gearpump.{TimeStamp, Message} +import io.gearpump.{Message, TimeStamp} /** - * default TimeStampFilter that filters out messages with smaller timestamps + * TimeStampFilter filters out messages which have obsolete (smaller) timestamp. */ class DefaultTimeStampFilter extends TimeStampFilter { override def filter(msg: Message, predicate: TimeStamp): Option[Message] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala index a25e20e..dfe3e93 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,10 +19,10 @@ package io.gearpump.streaming.state.api trait Monoid[T] extends java.io.Serializable { - def plus(l: T, r: T): T - def zero: T + def plus(l: T, r: T): T + def zero: T } trait Group[T] extends Monoid[T] { - def minus(l: T, r: T): T - } \ No newline at end of file + def minus(l: T, r: T): T +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala index dadbba6..238eab4 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,9 +28,9 @@ import io.gearpump.TimeStamp * the incoming value using monoid.plus to get a new state value */ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] { - // left state updated by messages before checkpoint time + // Left state updated by messages before checkpoint time private[state] var left: T = monoid.zero - // right state updated by message after checkpoint time + // Right state updated by message after checkpoint time private[state] var right: T = monoid.zero protected var checkpointTime = Long.MaxValue http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala index 6c595da..f1b923a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,33 +30,32 @@ import io.gearpump._ trait PersistentState[T] { /** - * recover state to a previous checkpoint + * Recovers state to a previous checkpoint * usually invoked by the framework */ def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit /** - * update state on a new message + * Updates state on a new message * this is invoked by user */ def update(timestamp: TimeStamp, t: T): Unit /** - * set next checkpoint time + * Sets next checkpoint time * should be invoked by the framework */ def setNextCheckpointTime(timeStamp: TimeStamp): Unit /** - * get a binary snapshot of state + * Gets a binary snapshot of state * usually invoked by the framework */ def checkpoint(): Array[Byte] /** - * unwrap the raw value of state + * Unwraps the raw value of state */ def get: Option[T] } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala index fbf507f..a8a5d5d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,16 +19,15 @@ package io.gearpump.streaming.state.api import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.state.impl.{PersistentStateConfig, CheckpointManager} +import io.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext} import io.gearpump.streaming.transaction.api.CheckpointStoreFactory import io.gearpump.util.LogUtil import io.gearpump.{Message, TimeStamp} -import scala.concurrent.duration.FiniteDuration - object PersistentTask { val CHECKPOINT = Message("checkpoint") val LOG = LogUtil.getLogger(getClass) @@ -42,35 +41,32 @@ object PersistentTask { */ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import io.gearpump.streaming.state.api.PersistentTask._ import taskContext._ - val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + import io.gearpump.streaming.state.api.PersistentTask._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( + PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext) val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) - // system time interval to attempt checkpoint + // System time interval to attempt checkpoint private val checkpointAttemptInterval = 1000L /** - * subclass should override this method to pass in - * a PersistentState - * - * the framework has already offered two states - * - * - NonWindowState - * state with no time or other boundary - * - WindowState - * each state is bounded by a time window + * Subclass should override this method to pass in a PersistentState. the framework has already + * offered two states: + * - NonWindowState: state with no time or other boundary + * - WindowState: each state is bounded by a time window */ def persistentState: PersistentState[T] /** - * subclass should override this method to specify how a - * new message should update state + * Subclass should override this method to specify how a new message should update state */ def processMessage(state: PersistentState[T], message: Message): Unit + /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */ val state = persistentState final override def onStart(startTime: StartTime): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala index 4b8e2f5..f87b224 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,6 @@ package io.gearpump.streaming.state.api import scala.util.Try trait Serializer[T] extends java.io.Serializable { - def serialize(t: T): Array[Byte] - def deserialize(bytes: Array[Byte]): Try[T] - } + def serialize(t: T): Array[Byte] + def deserialize(bytes: Array[Byte]): Try[T] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala index 5fcbbcb..76c91c7 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ package io.gearpump.streaming.state.impl import io.gearpump.TimeStamp import io.gearpump.streaming.transaction.api.CheckpointStore +/** Manage physical checkpoints to persitent storage like HDFS */ class CheckpointManager(checkpointInterval: Long, checkpointStore: CheckpointStore) { @@ -34,7 +35,7 @@ class CheckpointManager(checkpointInterval: Long, def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { checkpointStore.persist(timestamp, checkpoint) checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => - time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval + time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval } checkpointTime http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala index 164b932..21623e3 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,7 @@ package io.gearpump.streaming.state.impl import io.gearpump.TimeStamp import io.gearpump.cluster.UserConfig import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.transaction.api.{CheckpointStoreFactory, CheckpointStore} +import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} /** * an in memory store provided for test @@ -41,7 +41,6 @@ class InMemoryCheckpointStore extends CheckpointStore { override def close(): Unit = { checkpoints = Map.empty[TimeStamp, Array[Byte]] } - } class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala index aefd7e1..dcd3918 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,22 @@ package io.gearpump.streaming.state.impl +import org.slf4j.Logger + import io.gearpump.TimeStamp import io.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer} -import io.gearpump.util.LogUtil import io.gearpump.streaming.state.impl.NonWindowState._ -import org.slf4j.Logger +import io.gearpump.util.LogUtil object NonWindowState { val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]]) } /** - * a MonoidState storing non-window state + * a MonoidState storing non-window state */ -class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) extends MonoidState[T](monoid) { +class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) + extends MonoidState[T](monoid) { override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { serializer.deserialize(bytes).foreach(left = _) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala index ecbb092..d7488d7 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala index 13f0eef..63cdf06 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,7 +20,7 @@ package io.gearpump.streaming.state.impl import io.gearpump.TimeStamp /** - * used in window applications + * Used in window applications * it keeps the current window and slide ahead when the window expires */ class Window(val windowSize: Long, val windowStep: Long) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala index 41395f6..d7d3776 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala index 007c9a4..30382c0 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala @@ -2,12 +2,12 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding cstateyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a cstatey of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.streaming.state.impl +import scala.collection.immutable.TreeMap + +import org.slf4j.Logger + import io.gearpump.TimeStamp -import io.gearpump.streaming.state.api.{Group, Serializer, MonoidState} -import io.gearpump.streaming.task.TaskContext +import io.gearpump.streaming.state.api.{Group, MonoidState, Serializer} import io.gearpump.streaming.state.impl.WindowState._ +import io.gearpump.streaming.task.TaskContext import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.immutable.TreeMap /** * an interval is a dynamic time range that is divided by window boundary and checkpoint time @@ -51,10 +52,9 @@ object WindowState { * possible to undo the update by messages that have left the window */ class WindowState[T](group: Group[T], - serializer: Serializer[TreeMap[Interval, T]], - taskContext: TaskContext, - window: Window) - extends MonoidState[T](group) { + serializer: Serializer[TreeMap[Interval, T]], + taskContext: TaskContext, + window: Window) extends MonoidState[T](group) { /** * each interval has a state updated by message with timestamp in * [interval.startTime, interval.endTime) @@ -67,11 +67,11 @@ class WindowState[T](group: Group[T], window.slideTo(timestamp) serializer.deserialize(bytes) .foreach { states => - intervalStates = states - left = states.foldLeft(left) { case (accum, iter) => - group.plus(accum, iter._2) + intervalStates = states + left = states.foldLeft(left) { case (accum, iter) => + group.plus(accum, iter._2) + } } - } } override def update(timestamp: TimeStamp, t: T): Unit = { @@ -115,14 +115,17 @@ class WindowState[T](group: Group[T], } /** - * each message will update state in corresponding Interval[StartTime, endTime), + * Each message will update state in corresponding Interval[StartTime, endTime), + * * which is decided by the message's timestamp t where - * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime) - * endTime = Math.min(upperBound1, upperBound2, checkpointTime) - * lowerBound1 = step * Nmax1 <= t - * lowerBound2 = step * Nmax2 + size <= t - * upperBound1 = step * Nmin1 > t - * upperBound2 = step * Nmin2 + size > t + * {{{ + * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime) + * endTime = Math.min(upperBound1, upperBound2, checkpointTime) + * lowerBound1 = step * Nmax1 <= t + * lowerBound2 = step * Nmax2 + size <= t + * upperBound1 = step * Nmin1 > t + * upperBound2 = step * Nmin2 + size > t + * }}} */ private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = { val windowSize = window.windowSize @@ -144,7 +147,8 @@ class WindowState[T](group: Group[T], } } - private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp): Unit = { + private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp) + : Unit = { val interval = getInterval(timestamp, checkpointTime) intervalStates.get(interval) match { case Some(st) => @@ -154,8 +158,8 @@ class WindowState[T](group: Group[T], } } - private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp): TreeMap[Interval, T] = { + private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp) + : TreeMap[Interval, T] = { intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala index dcdc2ff..962f48f 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,12 +21,10 @@ import scala.concurrent._ /** * Generic storage to store KV Data. - * */ trait AppDataStore { def put(key: String, value: Any): Future[Any] - def get(key: String) : Future[Any] + def get(key: String): Future[Any] } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala index 13e3dae..f1a19a8 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,12 +17,13 @@ */ package io.gearpump.streaming.storage +import scala.concurrent.Future + import akka.actor.ActorRef import akka.pattern.ask -import io.gearpump.cluster.AppMasterToMaster.{GetAppDataResult, GetAppData, SaveAppData} -import io.gearpump.util.Constants -import scala.concurrent.Future +import io.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, SaveAppData} +import io.gearpump.util.Constants /** * In memory application storage located on master nodes @@ -36,8 +37,8 @@ class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends AppDataStor } override def get(key: String): Future[Any] = { - master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map{result => - if(result.key.equals(key)) { + master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map { result => + if (result.key.equals(key)) { result.value } else { null http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala index 2dbdd14..a3cb6e1 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,16 @@ package io.gearpump.streaming.task import akka.actor.{ActorRef, ExtendedActorSystem} -import io.gearpump.serializer.SerializationFramework + +import io.gearpump.Message import io.gearpump.transport.netty.TaskMessage import io.gearpump.transport.{Express, HostPort} -import io.gearpump.Message - -import scala.collection.mutable import io.gearpump.util.AkkaHelper /** * ExpressTransport wire the networking function from default akka - * networking to customized implementation [[Express]]. - * - * See [[Express]] for more information. + * networking to customized implementation [[io.gearpump.transport.Express]]. * + * See [[io.gearpump.transport.Express]] for more information. */ trait ExpressTransport { this: TaskActor => @@ -39,15 +36,15 @@ trait ExpressTransport { final val express = Express(context.system) implicit val system = context.system.asInstanceOf[ExtendedActorSystem] - final def local = express.localHost + final def local: HostPort = express.localHost lazy val sourceId = TaskId.toLong(taskId) lazy val sessionRef: ActorRef = { AkkaHelper.actorFor(system, s"/session#$sessionId") } - def transport(msg : AnyRef, remotes : TaskId *): Unit = { - var serializedMessage : AnyRef = null + def transport(msg: AnyRef, remotes: TaskId*): Unit = { + var serializedMessage: AnyRef = null remotes.foreach { remote => val transportId = TaskId.toLong(remote) @@ -69,7 +66,8 @@ trait ExpressTransport { if (remoteAddress.isDefined) { express.transport(taskMessage, remoteAddress.get) } else { - LOG.error(s"Can not find target task $remote, maybe the application is undergoing recovery") + LOG.error( + s"Can not find target task $remote, maybe the application is undergoing recovery") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala index ff2ca89..9f9bf1b 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala index edac7f0..72bc7db 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala @@ -15,19 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.streaming.task -import io.gearpump.esotericsoftware.kryo.util.ObjectMap -import io.gearpump.esotericsoftware.kryo.util.IntMap +import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap} import io.gearpump.streaming.task.SerializerResolver.Registration private[task] class SerializerResolver { private var classId = 0 - val idToRegistration = new IntMap[Registration]() - val classToRegistration = new ObjectMap[Class[_], Registration]() + private val idToRegistration = new IntMap[Registration]() + private val classToRegistration = new ObjectMap[Class[_], Registration]() - def register(clazz: Class[_], serializer: TaskMessageSerializer[_]): Unit = { - val registration = Registration(classId, clazz, serializer) + def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit = { + val registration = new Registration(classId, clazz, serializer) idToRegistration.put(classId, registration) classToRegistration.put(clazz, registration) classId += 1 @@ -42,6 +42,6 @@ private[task] class SerializerResolver { } } -object SerializerResolver{ - case class Registration(id: Int, clazz: Class[_], serializer: TaskMessageSerializer[_]) +object SerializerResolver { + class Registration(val id: Int, val clazz: Class[_], val serializer: TaskMessageSerializer[_]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala index 75e3521..6bc8b15 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala @@ -1,5 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.streaming.task import io.gearpump.TimeStamp -case class StartTime(startTime : TimeStamp = 0) +/** Start time of streaming application. All message older than start time will be dropped */ +case class StartTime(startTime: TimeStamp = 0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala index 52b8021..17d0b1b 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,12 +19,13 @@ package io.gearpump.streaming.task import java.io.{DataInput, DataOutput} -import io.gearpump.streaming.{LatencyProbeSerializer, InitialAckRequestSerializer, AckRequestSerializer, AckSerializer} +import org.slf4j.Logger + +import io.gearpump.streaming.{AckRequestSerializer, AckSerializer, InitialAckRequestSerializer, LatencyProbeSerializer} import io.gearpump.transport.netty.ITransportMessageSerializer import io.gearpump.util.LogUtil -import org.slf4j.Logger -class StreamingTransportSerializer extends ITransportMessageSerializer{ +class StreamingTransportSerializer extends ITransportMessageSerializer { private val log: Logger = LogUtil.getLogger(getClass) private val serializers = new SerializerResolver @@ -36,7 +37,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{ override def serialize(dataOutput: DataOutput, obj: Object): Unit = { val registration = serializers.getRegistration(obj.getClass) - if(registration != null) { + if (registration != null) { dataOutput.writeInt(registration.id) registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput, obj) } else { @@ -47,7 +48,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{ override def deserialize(dataInput: DataInput, length: Int): Object = { val classID = dataInput.readInt() val registration = serializers.getRegistration(classID) - if(registration != null) { + if (registration != null) { registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput) } else { log.error(s"Can not find serializer for class id $classID") @@ -57,7 +58,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{ override def getLength(obj: Object): Int = { val registration = serializers.getRegistration(obj.getClass) - if(registration != null) { + if (registration != null) { registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj) + 4 } else { log.error(s"Can not find serializer for class type ${obj.getClass}") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala index 72d46f7..d20074b 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,7 +19,7 @@ package io.gearpump.streaming.task import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.{LifeTime, DAG} +import io.gearpump.streaming.{DAG, LifeTime} /** * Each processor can have multiple downstream subscribers. @@ -30,8 +30,8 @@ import io.gearpump.streaming.{LifeTime, DAG} * @param processorId subscriber processor Id * @param partitionerDescription subscriber partitioner */ - -case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, parallelism: Int, lifeTime: LifeTime) +case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, + parallelism: Int, lifeTime: LifeTime) object Subscriber { @@ -50,7 +50,8 @@ object Subscriber { edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) => val (_, partitioner, downstreamProcessorId) = nodeEdgeNode val downstreamProcessor = dag.processors(downstreamProcessorId) - list :+ Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, downstreamProcessor.life) + list :+ Subscriber(downstreamProcessorId, partitioner, + downstreamProcessor.parallelism, downstreamProcessor.life) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala index 0b1fa29..155edf4 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,22 @@ package io.gearpump.streaming.task +import org.slf4j.Logger + import io.gearpump.google.common.primitives.Shorts -import io.gearpump.partitioner.{Partitioner, MulticastPartitioner, UnicastPartitioner} +import io.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import io.gearpump.streaming.AppMasterToExecutor.MsgLostException import io.gearpump.streaming.LifeTime import io.gearpump.streaming.task.Subscription._ import io.gearpump.util.LogUtil import io.gearpump.{Message, TimeStamp} -import org.slf4j.Logger /** - * This manage the output and message clock for single downstream processor + * Manges the output and message clock for single downstream processor * * @param subscriber downstream processor - * @param maxPendingMessageCount trigger flow control. Should be bigger than maxPendingMessageCountPerAckRequest + * @param maxPendingMessageCount trigger flow control. Should be bigger than + * maxPendingMessageCountPerAckRequest * @param ackOnceEveryMessageCount send on AckRequest to the target */ class Subscription( @@ -44,9 +46,10 @@ class Subscription( ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) { assert(maxPendingMessageCount >= ackOnceEveryMessageCount) - assert(maxPendingMessageCount < Short.MaxValue / 2) + assert(maxPendingMessageCount < Short.MaxValue / 2) - val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, + executor = executorId, task = taskId) import subscriber.{parallelism, partitionerDescription, processorId} @@ -62,8 +65,8 @@ class Subscription( private var life = subscriber.lifeTime - val partitioner = partitionerDescription.partitionerFactory.partitioner - val sendFn = partitioner match { + private val partitioner = partitionerDescription.partitionerFactory.partitioner + private val sendFn = partitioner match { case up: UnicastPartitioner => (msg: Message) => { val partition = up.getPartition(msg, parallelism, taskId.index) @@ -74,14 +77,13 @@ class Subscription( val partitions = mp.getPartitions(msg, parallelism, taskId.index) partitions.map(partition => sendMessage(msg, partition)).sum } - } def changeLife(life: LifeTime): Unit = { this.life = life } - def start: Unit = { + def start(): Unit = { val ackRequest = InitialAckRequest(taskId, sessionId) transport.transport(ackRequest, allTasks: _*) } @@ -91,14 +93,16 @@ class Subscription( } /** - * Return how many message is actually sent by this subscription + * Returns how many message is actually sent by this subscription + * * @param msg the message to send * @param partition the target partition to send message to * @return 1 if success */ def sendMessage(msg: Message, partition: Int): Int = { - // only send message whose timestamp matches the lifeTime + var count = 0 + // Only sends message whose timestamp matches the lifeTime if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) { val targetTask = TaskId(processorId, partition) @@ -117,24 +121,25 @@ class Subscription( (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) { sendLatencyProbe(partition) } - - return 1 + count = 1 + count } else { if (needFlush) { - flush + flush() } - - return 0 + count = 0 + count } } private var lastFlushTime: Long = 0L private val FLUSH_INTERVAL = 5 * 1000 // ms private def needFlush: Boolean = { - System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && Shorts.max(pendingMessageCount: _*) > 0 + System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && + Shorts.max(pendingMessageCount: _*) > 0 } - private def flush: Unit = { + private def flush(): Unit = { lastFlushTime = System.currentTimeMillis() allTasks.foreach { targetTaskId => sendAckRequest(targetTaskId.index) @@ -142,13 +147,14 @@ class Subscription( } private def allTasks: scala.collection.Seq[TaskId] = { - (0 until parallelism).map {taskIndex => + (0 until parallelism).map { taskIndex => TaskId(processorId, taskIndex) } } - /** Handle acknowledge message. - * Throw MessageLossException if required. + /** + * Handles acknowledge message. Throw MessageLossException if required. + * * @param ack acknowledge message received */ def receiveAck(ack: Ack): Unit = { @@ -159,7 +165,7 @@ class Subscription( if (ack.actualReceivedNum == ack.seq) { if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) { if (ack.seq == messageCount(index)) { - // all messages have been acked. + // All messages have been acked. minClockValue(index) = Long.MaxValue } else { minClockValue(index) = candidateMinClock(index) @@ -171,7 +177,8 @@ class Subscription( pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort updateMaxPendingCount() } else { - LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, sent: ${ack.seq}, try to replay...") + LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " + + s"sent: ${ack.seq}, try to replay...") throw new MsgLostException } } @@ -181,13 +188,14 @@ class Subscription( minClockValue.min } - def allowSendingMoreMessages() : Boolean = { + def allowSendingMoreMessages(): Boolean = { maxPendingCount < maxPendingMessageCount } def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { minClockValue.indices.foreach { i => - if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 && allowSendingMoreMessages) { + if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 + && allowSendingMoreMessages) { sendAckRequest(i) sendLatencyProbe(i) } @@ -195,7 +203,7 @@ class Subscription( } private def sendAckRequest(partition: Int): Unit = { - // we increment more count for each AckRequest + // Increments more count for each AckRequest // to throttle the number of unacked AckRequest incrementMessageCount(partition, ackOnceEveryMessageCount) val targetTask = TaskId(processorId, partition) @@ -218,11 +226,10 @@ class Subscription( val targetTask = TaskId(processorId, partition) transport.transport(probeLatency, targetTask) } - } object Subscription { - //make sure it is smaller than MAX_PENDING_MESSAGE_COUNT + // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100 final val MAX_PENDING_MESSAGE_COUNT = 1000 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala index a176ca7..212a659 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.streaming.task +import scala.concurrent.duration.FiniteDuration + import akka.actor.Actor.Receive import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.util.LogUtil import io.gearpump.{Message, TimeStamp} -import org.slf4j.Logger - -import scala.concurrent.duration.FiniteDuration /** * This provides context information for a task. @@ -36,7 +37,7 @@ trait TaskContext { def executorId: Int - def appId : Int + def appId: Int def appName: String @@ -44,7 +45,7 @@ trait TaskContext { * The actorRef of AppMaster * @return application master's actor reference */ - def appMaster : ActorRef + def appMaster: ActorRef /** * The task parallelism @@ -61,64 +62,62 @@ trait TaskContext { */ def parallelism: Int - /** * Please don't use this if possible. * @return self actor ref */ - //TODO: We should remove the self from TaskContext + // TODO: We should remove the self from TaskContext def self: ActorRef /** * Please don't use this if possible * @return the actor system */ - //TODO: we should remove this in future + // TODO: we should remove this in future def system: ActorSystem /** - * This can be used to output messages to downstream tasks. - * The data shuffling rule can be decided by Partitioner. + * This can be used to output messages to downstream tasks. The data shuffling rule + * can be decided by Partitioner. + * * @param msg message to output */ - def output(msg : Message) : Unit - + def output(msg: Message): Unit def actorOf(props: Props): ActorRef def actorOf(props: Props, name: String): ActorRef - def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: â Unit): Cancellable + def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable /** * akka.actor.ActorRefProvider.scheduleOnce + * * @param initialDelay the initial delay * @param f the function to execute after initial delay * @return the executable */ - def scheduleOnce(initialDelay: FiniteDuration)(f: â Unit): Cancellable + def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable - /** + /** * For managed message(type of Message), the sender only serve as a unique Id, * It's address is not something meaningful, you should not use this directly * * For unmanaged message, the sender represent the sender ActorRef * @return sender */ - def sender: ActorRef - + def sender: ActorRef /** - * retrieve upstream min clock from TaskActor + * Retrieves upstream min clock from TaskActor + * * @return the min clock */ def upstreamMinClock: TimeStamp - /** - * logger is environment dependant, it should be provided by + * Logger is environment dependant, it should be provided by * containing environment. - * @return */ def logger: Logger } @@ -130,28 +129,35 @@ trait TaskInterface { /** * Method called with the task is initialized. - * @param startTime startTime that can be used to decide from when a source producer task should replay the data source, or from when a processor task should recover its checkpoint data in to in-memory state. + * @param startTime startTime that can be used to decide from when a source producer task should + * replay the data source, or from when a processor task should recover its + * checkpoint data in to in-memory state. */ - def onStart(startTime : StartTime) : Unit + def onStart(startTime: StartTime): Unit - /** Method called for each message received. - * @param msg message send by upstream tasks + /** + * Method called for each message received. + * + * @param msg Message send by upstream tasks */ - def onNext(msg : Message) : Unit + def onNext(msg: Message): Unit - /** Method called when task is under clean up. + /** + * Method called when task is under clean up. + * * This can be used to cleanup resource when the application finished. */ - def onStop() : Unit + def onStop(): Unit /** - * handler for unmanaged message - * @return the handler - */ + * Handlers unmanaged messages + * + * @return the handler + */ def receiveUnManagedMessage: Receive = null } -abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends TaskInterface{ +abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface { import taskContext.{appId, executorId, taskId} @@ -170,11 +176,11 @@ abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends Ta */ protected def sender: ActorRef = taskContext.sender - def onStart(startTime : StartTime) : Unit = {} + def onStart(startTime: StartTime): Unit = {} - def onNext(msg : Message) : Unit = {} + def onNext(msg: Message): Unit = {} - def onStop() : Unit = {} + def onStop(): Unit = {} override def receiveUnManagedMessage: Receive = { case msg => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala index d54fce5..b9b8829 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,6 +22,8 @@ import java.util import java.util.concurrent.TimeUnit import akka.actor._ +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap import io.gearpump.metrics.Metrics @@ -31,107 +33,107 @@ import io.gearpump.streaming.ExecutorToAppMaster._ import io.gearpump.streaming.{Constants, ProcessorId} import io.gearpump.util.{LogUtil, TimeOutScheduler} import io.gearpump.{Message, TimeStamp} -import org.slf4j.Logger /** * - * All tasks of Gearpump runs inside a Actor. - * TaskActor is the Actor container for a task. + * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task. */ class TaskActor( val taskId: TaskId, - val taskContextData : TaskContextData, - userConf : UserConfig, + val taskContextData: TaskContextData, + userConf: UserConfig, val task: TaskWrapper, inputSerializerPool: SerializationFramework) - extends Actor with ExpressTransport with TimeOutScheduler{ + extends Actor with ExpressTransport with TimeOutScheduler { var upstreamMinClock: TimeStamp = 0L private var _minClock: TimeStamp = 0L def serializerPool: SerializationFramework = inputSerializerPool - import Constants._ - import io.gearpump.streaming.task.TaskActor._ import taskContextData._ + + import io.gearpump.streaming.Constants._ + import io.gearpump.streaming.task.TaskActor._ val config = context.system.settings.config val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) - //metrics + // Metrics private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" - private val receiveLatency = Metrics(context.system).histogram(s"$metricName:receiveLatency", sampleRate = 1) + private val receiveLatency = Metrics(context.system).histogram( + s"$metricName:receiveLatency", sampleRate = 1) private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput") private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput") private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT) - private val ackOnceEveryMessageCount = config.getInt(GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT) + private val ackOnceEveryMessageCount = config.getInt( + GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT) private val executor = context.parent private var life = taskContextData.life - //latency probe - import context.dispatcher - + // Latency probe import scala.concurrent.duration._ + + import context.dispatcher final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - // clock report interval + // Clock report interval final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - // flush interval + // Flush interval final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) private val queue = new util.LinkedList[AnyRef]() private var subscriptions = List.empty[(Int, Subscription)] - // securityChecker will be responsible of dropping messages from + // SecurityChecker will be responsible of dropping messages from // unknown sources - private val securityChecker = new SecurityChecker(taskId, self) + private val securityChecker = new SecurityChecker(taskId, self) private[task] var sessionId = NONE_SESSION - //report to appMaster with my address + // Reports to appMaster with my address express.registerLocalActor(TaskId.toLong(taskId), self) - final def receive : Receive = null + final def receive: Receive = null task.setTaskActor(this) - def onStart(startTime : StartTime) : Unit = { + def onStart(startTime: StartTime): Unit = { task.onStart(startTime) } - def onNext(msg : Message) : Unit = task.onNext(msg) + def onNext(msg: Message): Unit = task.onNext(msg) def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg) - def onStop() : Unit = task.onStop() + def onStop(): Unit = task.onStop() /** * output to a downstream by specifying a arrayIndex - * @param arrayIndex, this is not same as ProcessorId - * @param msg + * @param arrayIndex this is not same as ProcessorId */ - def output(arrayIndex: Int, msg: Message) : Unit = { + def output(arrayIndex: Int, msg: Message): Unit = { var count = 0 - count += this.subscriptions(arrayIndex)._2.sendMessage(msg) + count += this.subscriptions(arrayIndex)._2.sendMessage(msg) sendThroughput.mark(count) } - def output(msg : Message) : Unit = { + def output(msg: Message): Unit = { var count = 0 - this.subscriptions.foreach{ subscription => + this.subscriptions.foreach { subscription => count += subscription._2.sendMessage(msg) } sendThroughput.mark(count) } - final override def postStop() : Unit = { + final override def postStop(): Unit = { onStop() } - final override def preStart() : Unit = { + final override def preStart(): Unit = { val register = RegisterTask(taskId, executorId, local) LOG.info(s"$register") executor ! register @@ -154,7 +156,7 @@ class TaskActor( msg match { case SendAck(ack, targetTask) => transport(ack, targetTask) - case m : Message => + case m: Message => count += 1 onNext(m) case other => @@ -172,18 +174,18 @@ class TaskActor( } } - private def onStartClock: Unit = { + private def onStartClock(): Unit = { LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId") subscriptions = subscribers.map { subscriber => - (subscriber.processorId , + (subscriber.processorId, new Subscription(appId, executorId, taskId, subscriber, sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)) }.sortBy(_._1) - subscriptions.foreach(_._2.start) + subscriptions.foreach(_._2.start()) import scala.collection.JavaConverters._ - stashQueue.asScala.foreach{item => + stashQueue.asScala.foreach { item => handleMessages(item.sender).apply(item.msg) } stashQueue.clear() @@ -198,7 +200,7 @@ class TaskActor( } def waitForTaskRegistered: Receive = { - case start@ TaskRegistered(_, sessionId, startClock) => + case start@TaskRegistered(_, sessionId, startClock) => this.sessionId = sessionId this.upstreamMinClock = startClock context.become(waitForStartClock) @@ -206,9 +208,9 @@ class TaskActor( private val stashQueue = new util.LinkedList[MessageAndSender]() - def waitForStartClock : Receive = { + def waitForStartClock: Receive = { case start: StartTask => - onStartClock + onStartClock() case other: AnyRef => stashQueue.add(MessageAndSender(other, sender())) } @@ -221,8 +223,9 @@ class TaskActor( doHandleMessage() } case ackRequest: AckRequest => - //enqueue to handle the ackRequest and send back ack later - val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount) + // Enqueue to handle the ackRequest and send back ack later + val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, + ackOnceEveryMessageCount) if (null != ackResponse) { queue.add(SendAck(ackResponse, ackRequest.taskId)) doHandleMessage() @@ -231,16 +234,17 @@ class TaskActor( subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack)) doHandleMessage() case inputMessage: SerializedMessage => - val message = Message(serializerPool.get().deserialize(inputMessage.bytes), inputMessage.timeStamp) + val message = Message(serializerPool.get().deserialize(inputMessage.bytes), + inputMessage.timeStamp) receiveMessage(message, sender) case inputMessage: Message => receiveMessage(inputMessage, sender) - case upstream@ UpstreamMinClock(upstreamClock) => + case upstream@UpstreamMinClock(upstreamClock) => this.upstreamMinClock = upstreamClock val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => val subMin = sub._2.minClock - // a subscription is holding back the _minClock; + // A subscription is holding back the _minClock; // we send AckRequest to its tasks to push _minClock forward if (subMin == _minClock) { sub._2.sendAckRequestOnStallingTime(_minClock) @@ -255,7 +259,7 @@ class TaskActor( appMaster ! update } - // check whether current task is dead. + // Checks whether current task is dead. if (_minClock > life.death) { // There will be no more message received... val unRegister = UnRegisterTask(taskId, executorId) @@ -273,11 +277,11 @@ class TaskActor( case Some(subscription) => subscription.changeLife(subscriber.lifeTime cross this.life) case None => - val subscription = new Subscription(appId, executorId, taskId, subscriber, sessionId, this, - maxPendingMessageCount, ackOnceEveryMessageCount) - subscription.start - subscriptions :+= (subscriber.processorId, subscription) - // sort, keep the order + val subscription = new Subscription(appId, executorId, taskId, subscriber, + sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount) + subscription.start() + subscriptions :+=(subscriber.processorId, subscription) + // Sorting, keep the order subscriptions = subscriptions.sortBy(_._1) } } @@ -293,12 +297,12 @@ class TaskActor( } /** - * @return min clock of this task + * Returns min clock of this task */ def minClock: TimeStamp = _minClock /** - * @return min clock of upstream task + * Returns min clock of upstream task */ def getUpstreamMinClock: TimeStamp = upstreamMinClock @@ -309,8 +313,8 @@ class TaskActor( queue.add(msg) doHandleMessage() case None => - //Todo: Indicate the error and avoid the LOG flood - //LOG.error(s"Task $taskId drop message $msg") + // TODO: Indicate the error and avoid the LOG flood + // LOG.error(s"Task $taskId drop message $msg") } } @@ -320,28 +324,28 @@ class TaskActor( } object TaskActor { - val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 //3 seconds + // 3 seconds + val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 // If the message comes from an unknown source, securityChecker will drop it - class SecurityChecker(task_id: TaskId, self : ActorRef) { + class SecurityChecker(task_id: TaskId, self: ActorRef) { private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id) - // Use mutable HashMap for performance optimization + // Uses mutable HashMap for performance optimization private val receivedMsgCount = new IntShortHashMap() - // Tricky performance optimization to save memory. // We store the session Id in the uid of ActorPath // ActorPath.hashCode is same as uid. private def getSessionId(actor: ActorRef): Int = { - //TODO: As method uid is protected in [akka] package. We + // TODO: As method uid is protected in [akka] package. We // are using hashCode instead of uid. actor.hashCode() } def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = { - LOG.debug(s"Handle InitialAckRequest for session $ackRequest" ) + LOG.debug(s"Handle InitialAckRequest for session $ackRequest") val sessionId = ackRequest.sessionId if (sessionId == NONE_SESSION) { LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest") @@ -355,7 +359,7 @@ object TaskActor { def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = { val sessionId = ackRequest.sessionId if (receivedMsgCount.containsKey(sessionId)) { - // we increment more count for each AckRequest + // Increments more count for each AckRequest // to throttle the number of unacked AckRequest receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort) Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId) @@ -366,8 +370,8 @@ object TaskActor { } // If the message comes from an unknown source, then drop it - def checkMessage(message : Message, sender: ActorRef): Option[Message] = { - if(sender.equals(self)){ + def checkMessage(message: Message, sender: ActorRef): Option[Message] = { + if (sender.equals(self)) { Some(message) } else { val sessionId = getSessionId(sender) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala index ce32606..28605cf 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala @@ -19,13 +19,14 @@ package io.gearpump.streaming.task import akka.actor.ActorRef + import io.gearpump.streaming.LifeTime case class TaskContextData( - executorId : Int, - appId : Int, + executorId: Int, + appId: Int, appName: String, - appMaster : ActorRef, + appMaster: ActorRef, parallelism: Int, life: LifeTime, subscribers: List[Subscriber]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala index 59babff..d89387a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,9 +32,11 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int) */ case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int) -/* - Here the seq field represents the expected number of received messages - and the actualReceivedNum field means the actual received number since start +/** + * Ack back to sender task actor. + * + * @param seq The seq field represents the expected number of received messages and the + * actualReceivedNum field means the actual received number since start. */ case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int) @@ -60,6 +62,7 @@ case object GetStartClock case class StartClock(clock: TimeStamp) +/** Probe the latency between two upstream to downstream tasks. */ case class LatencyProbe(timestamp: Long) case class SendMessageLoss() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala index f599540..66c3c52 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,9 +20,9 @@ package io.gearpump.streaming.task import io.gearpump.streaming._ -case class TaskId(processorId : ProcessorId, index : TaskIndex) +case class TaskId(processorId: ProcessorId, index: TaskIndex) object TaskId { - def toLong(id : TaskId) = (id.processorId.toLong << 32) + id.index - def fromLong(id : Long) = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt) + def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index + def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala index 370eef7..500f8b3 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,8 +19,6 @@ package io.gearpump.streaming.task import java.io.{DataInput, DataOutput} -import org.jboss.netty.buffer.ChannelBuffer - trait TaskMessageSerializer[T] { def write(dataOutput: DataOutput, obj: T) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala index c6564ff..040fc2e 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala @@ -21,11 +21,12 @@ package io.gearpump.streaming.task object TaskUtil { /** - * Resolve a classname to a Task class. + * Resolves a classname to a Task class. + * * @param className the class name to resolve * @return resolved class */ - def loadClass(className: String): Class[_<:Task] = { + def loadClass(className: String): Class[_ <: Task] = { val loader = Thread.currentThread().getContextClassLoader() loader.loadClass(className).asSubclass(classOf[Task]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala index 3be396f..e7e883c 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala @@ -18,22 +18,26 @@ package io.gearpump.streaming.task +import scala.concurrent.duration.FiniteDuration + import akka.actor.Actor._ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import io.gearpump.{TimeStamp, Message} -import io.gearpump.cluster.UserConfig -import io.gearpump.util.LogUtil import org.slf4j.Logger -import scala.concurrent.duration.FiniteDuration +import io.gearpump.cluster.UserConfig +import io.gearpump.util.LogUtil +import io.gearpump.{Message, TimeStamp} /** * This provides TaskContext for user defined tasks + * * @param taskClass task class * @param context context class * @param userConf user config */ -class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, userConf: UserConfig) extends TaskContext with TaskInterface { +class TaskWrapper( + val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, + userConf: UserConfig) extends TaskContext with TaskInterface { private val LOG = LogUtil.getLogger(taskClass, task = taskId) @@ -56,18 +60,19 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task override def output(msg: Message): Unit = actor.output(msg) /** - * @see [[TaskActor.output]] + * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit + * * @param index, not same as ProcessorId - * @param msg */ def output(index: Int, msg: Message): Unit = actor.output(index, msg) /** * Use with caution, output unmanaged message to target tasks + * * @param msg message to output * @param tasks the tasks to output to */ - def outputUnManaged(msg: AnyRef, tasks: TaskId *): Unit = { + def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = { actor.transport(msg, tasks: _*) } @@ -105,12 +110,12 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task actor.getUpstreamMinClock } - def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: â Unit): Cancellable = { + def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = { val dispatcher = actor.context.system.dispatcher actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher) } - def scheduleOnce(initialDelay: FiniteDuration)(f: â Unit): Cancellable = { + def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = { val dispatcher = actor.context.system.dispatcher actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher) } @@ -121,9 +126,8 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task } /** - * logger is environment dependant, it should be provided by + * Logger is environment dependant, it should be provided by * containing environment. - * @return */ override def logger: Logger = LOG } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala index 7dd08fb..f3894ea 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,9 +18,9 @@ package io.gearpump.streaming.transaction.api -import io.gearpump.streaming.task.TaskContext import io.gearpump.TimeStamp import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.TaskContext /** * CheckpointStore persistently stores mapping of timestamp to checkpoint
