http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala deleted file mode 100644 index eab74aa..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -import io.gearpump._ -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -object DataSourceTask { - val DATA_SOURCE = "data_source" -} - -/** - * Default Task container for [[io.gearpump.streaming.source.DataSource]] that - * reads from DataSource in batch - * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage - * - * DataSourceTask calls: - * - `DataSource.open()` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] - * and application start time - * - `DataSource.read()` in each `onNext`, which reads a batch of messages - * - `DataSource.close()` in `onStop` - */ -class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { - import io.gearpump.streaming.source.DataSourceTask._ - - private val source = conf.getValue[DataSource](DATA_SOURCE).get - private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) - private var startTime = 0L - - override def onStart(newStartTime: StartTime): Unit = { - startTime = newStartTime.startTime - LOG.info(s"opening data source at $startTime") - source.open(context, startTime) - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(message: Message): Unit = { - 0.until(batchSize).foreach { _ => - Option(source.read()).foreach(context.output) - } - self ! Message("continue", System.currentTimeMillis()) - } - - override def onStop(): Unit = { - LOG.info("closing data source...") - source.close() - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala deleted file mode 100644 index b7d4e90..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.source - -import io.gearpump.streaming.transaction.api.TimeStampFilter -import io.gearpump.{Message, TimeStamp} - -/** - * TimeStampFilter filters out messages which have obsolete (smaller) timestamp. - */ -class DefaultTimeStampFilter extends TimeStampFilter { - override def filter(msg: Message, predicate: TimeStamp): Option[Message] = { - Option(msg).find(_.timestamp >= predicate) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala deleted file mode 100644 index dfe3e93..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.api - -trait Monoid[T] extends java.io.Serializable { - def plus(l: T, r: T): T - def zero: T -} - -trait Group[T] extends Monoid[T] { - def minus(l: T, r: T): T -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala deleted file mode 100644 index 238eab4..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.api - -import io.gearpump.TimeStamp - -/** - * MonoidState uses Algebird Monoid to aggregate state - * - * on start, state value is initialized to monoid.zero - * on each new message, existing state value is aggregated with - * the incoming value using monoid.plus to get a new state value - */ -abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] { - // Left state updated by messages before checkpoint time - private[state] var left: T = monoid.zero - // Right state updated by message after checkpoint time - private[state] var right: T = monoid.zero - - protected var checkpointTime = Long.MaxValue - - override def get: Option[T] = Option(monoid.plus(left, right)) - - override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = { - checkpointTime = nextCheckpointTime - } - - protected def updateState(timestamp: TimeStamp, t: T): Unit = { - if (timestamp < checkpointTime) { - left = monoid.plus(left, t) - } else { - right = monoid.plus(right, t) - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala deleted file mode 100644 index f1b923a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.api - -import io.gearpump._ - -/** - * PersistentState is part of the transaction API - * - * Users could get transaction support from the framework by - * conforming to PersistentState APIs and extending PersistentTask - * to manage the state - */ -trait PersistentState[T] { - - /** - * Recovers state to a previous checkpoint - * usually invoked by the framework - */ - def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit - - /** - * Updates state on a new message - * this is invoked by user - */ - def update(timestamp: TimeStamp, t: T): Unit - - /** - * Sets next checkpoint time - * should be invoked by the framework - */ - def setNextCheckpointTime(timeStamp: TimeStamp): Unit - - /** - * Gets a binary snapshot of state - * usually invoked by the framework - */ - def checkpoint(): Array[Byte] - - /** - * Unwraps the raw value of state - */ - def get: Option[T] -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala deleted file mode 100644 index a8a5d5d..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.api - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} -import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext} -import io.gearpump.streaming.transaction.api.CheckpointStoreFactory -import io.gearpump.util.LogUtil -import io.gearpump.{Message, TimeStamp} - -object PersistentTask { - val CHECKPOINT = Message("checkpoint") - val LOG = LogUtil.getLogger(getClass) -} - -/** - * PersistentTask is part of the transaction API - * - * Users should extend this task if they want to get transaction support - * from the framework - */ -abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - import taskContext._ - - import io.gearpump.streaming.state.api.PersistentTask._ - - val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( - PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get - val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext) - val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get - val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) - // System time interval to attempt checkpoint - private val checkpointAttemptInterval = 1000L - - /** - * Subclass should override this method to pass in a PersistentState. the framework has already - * offered two states: - * - NonWindowState: state with no time or other boundary - * - WindowState: each state is bounded by a time window - */ - def persistentState: PersistentState[T] - - /** - * Subclass should override this method to specify how a new message should update state - */ - def processMessage(state: PersistentState[T], message: Message): Unit - - /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */ - val state = persistentState - - final override def onStart(startTime: StartTime): Unit = { - val timestamp = startTime.startTime - checkpointManager - .recover(timestamp) - .foreach(state.recover(timestamp, _)) - - reportCheckpointClock(timestamp) - scheduleCheckpoint(checkpointAttemptInterval) - } - - final override def onNext(message: Message): Unit = { - message match { - case CHECKPOINT => - val upstreamMinClock = taskContext.upstreamMinClock - if (checkpointManager.shouldCheckpoint(upstreamMinClock)) { - checkpointManager.getCheckpointTime.foreach { checkpointTime => - val serialized = state.checkpoint() - checkpointManager.checkpoint(checkpointTime, serialized) - .foreach(state.setNextCheckpointTime) - taskContext.output(Message(serialized, checkpointTime)) - reportCheckpointClock(checkpointTime) - } - } - scheduleCheckpoint(checkpointAttemptInterval) - case _ => - checkpointManager.update(message.timestamp) - .foreach(state.setNextCheckpointTime) - processMessage(state, message) - } - } - - final override def onStop(): Unit = { - checkpointManager.close() - } - - private def scheduleCheckpoint(interval: Long): Unit = { - scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! CHECKPOINT) - } - - private def reportCheckpointClock(timestamp: TimeStamp): Unit = { - appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala deleted file mode 100644 index f87b224..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.api - -import scala.util.Try - -trait Serializer[T] extends java.io.Serializable { - def serialize(t: T): Array[Byte] - def deserialize(bytes: Array[Byte]): Try[T] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala deleted file mode 100644 index 76c91c7..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import io.gearpump.TimeStamp -import io.gearpump.streaming.transaction.api.CheckpointStore - -/** Manage physical checkpoints to persitent storage like HDFS */ -class CheckpointManager(checkpointInterval: Long, - checkpointStore: CheckpointStore) { - - private var maxMessageTime: Long = 0L - private var checkpointTime: Option[Long] = None - - def recover(timestamp: TimeStamp): Option[Array[Byte]] = { - checkpointStore.recover(timestamp) - } - - def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { - checkpointStore.persist(timestamp, checkpoint) - checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => - time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval - } - - checkpointTime - } - - def update(messageTime: TimeStamp): Option[TimeStamp] = { - maxMessageTime = Math.max(maxMessageTime, messageTime) - if (checkpointTime.isEmpty) { - checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval) - } - - checkpointTime - } - - def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = { - checkpointTime.exists(time => upstreamMinClock >= time) - } - - def getCheckpointTime: Option[TimeStamp] = checkpointTime - - def close(): Unit = { - checkpointStore.close() - } - - private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala deleted file mode 100644 index 21623e3..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import io.gearpump.TimeStamp -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} - -/** - * an in memory store provided for test - * should not be used in real cases - */ -class InMemoryCheckpointStore extends CheckpointStore { - private var checkpoints = Map.empty[TimeStamp, Array[Byte]] - - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { - checkpoints += timestamp -> checkpoint - } - - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { - checkpoints.get(timestamp) - } - - override def close(): Unit = { - checkpoints = Map.empty[TimeStamp, Array[Byte]] - } -} - -class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory { - override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { - new InMemoryCheckpointStore - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala deleted file mode 100644 index dcd3918..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer} -import io.gearpump.streaming.state.impl.NonWindowState._ -import io.gearpump.util.LogUtil - -object NonWindowState { - val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]]) -} - -/** - * a MonoidState storing non-window state - */ -class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) - extends MonoidState[T](monoid) { - - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { - serializer.deserialize(bytes).foreach(left = _) - } - - override def update(timestamp: TimeStamp, t: T): Unit = { - updateState(timestamp, t) - } - - override def checkpoint(): Array[Byte] = { - val serialized = serializer.serialize(left) - LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: ($checkpointTime, $left)") - left = monoid.plus(left, right) - right = monoid.zero - serialized - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala deleted file mode 100644 index d7488d7..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -object PersistentStateConfig { - - val STATE_CHECKPOINT_ENABLE = "state.checkpoint.enable" - val STATE_CHECKPOINT_INTERVAL_MS = "state.checkpoint.interval.ms" - val STATE_CHECKPOINT_STORE_FACTORY = "state.checkpoint.store.factory" - val STATE_WINDOW_SIZE = "state.window.size" - val STATE_WINDOW_STEP = "state.window.step" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala deleted file mode 100644 index 63cdf06..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.state.impl - -import io.gearpump.TimeStamp - -/** - * Used in window applications - * it keeps the current window and slide ahead when the window expires - */ -class Window(val windowSize: Long, val windowStep: Long) { - - def this(windowConfig: WindowConfig) = { - this(windowConfig.windowSize, windowConfig.windowStep) - } - - private var clock: TimeStamp = 0L - private var startTime = 0L - - def update(clock: TimeStamp): Unit = { - this.clock = clock - } - - def slideOneStep(): Unit = { - startTime += windowStep - } - - def slideTo(timestamp: TimeStamp): Unit = { - startTime = timestamp / windowStep * windowStep - } - - def shouldSlide: Boolean = { - clock >= (startTime + windowSize) - } - - def range: (TimeStamp, TimeStamp) = { - startTime -> (startTime + windowSize) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala deleted file mode 100644 index d7d3776..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -object WindowConfig { - val NAME = "window_config" -} - -case class WindowConfig(windowSize: Long, windowStep: Long) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala deleted file mode 100644 index 30382c0..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.state.impl - -import scala.collection.immutable.TreeMap - -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.streaming.state.api.{Group, MonoidState, Serializer} -import io.gearpump.streaming.state.impl.WindowState._ -import io.gearpump.streaming.task.TaskContext -import io.gearpump.util.LogUtil - -/** - * an interval is a dynamic time range that is divided by window boundary and checkpoint time - */ -case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] { - override def compare(that: Interval): Int = { - if (startTime < that.startTime) -1 - else if (startTime > that.startTime) 1 - else 0 - } -} - -object WindowState { - val LOG: Logger = LogUtil.getLogger(classOf[WindowState[_]]) -} - -/** - * this is a list of states, each of which is bounded by a time window - * state of each window doesn't affect each other - * - * WindowState requires a Algebird Group to be passed in - * Group augments Monoid with a minus function which makes it - * possible to undo the update by messages that have left the window - */ -class WindowState[T](group: Group[T], - serializer: Serializer[TreeMap[Interval, T]], - taskContext: TaskContext, - window: Window) extends MonoidState[T](group) { - /** - * each interval has a state updated by message with timestamp in - * [interval.startTime, interval.endTime) - */ - private var intervalStates = TreeMap.empty[Interval, T] - - private var lastCheckpointTime = 0L - - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { - window.slideTo(timestamp) - serializer.deserialize(bytes) - .foreach { states => - intervalStates = states - left = states.foldLeft(left) { case (accum, iter) => - group.plus(accum, iter._2) - } - } - } - - override def update(timestamp: TimeStamp, t: T): Unit = { - val (startTime, endTime) = window.range - if (timestamp >= startTime && timestamp < endTime) { - updateState(timestamp, t) - } - - updateIntervalStates(timestamp, t, checkpointTime) - - val upstreamMinClock = taskContext.upstreamMinClock - window.update(upstreamMinClock) - - if (window.shouldSlide) { - window.slideOneStep() - - val (newStartTime, newEndTime) = window.range - getIntervalStates(startTime, newStartTime).foreach { case (_, st) => - left = group.minus(left, st) - } - if (checkpointTime > endTime) { - getIntervalStates(endTime, checkpointTime).foreach { case (_, st) => - left = group.plus(left, st) - } - } else { - getIntervalStates(endTime, newEndTime).foreach { case (_, st) => - right = group.plus(right, st) - } - } - } - } - - override def checkpoint(): Array[Byte] = { - left = group.plus(left, right) - right = group.zero - - val states = getIntervalStates(window.range._1, checkpointTime) - lastCheckpointTime = checkpointTime - LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: ($checkpointTime, $states)") - serializer.serialize(states) - } - - /** - * Each message will update state in corresponding Interval[StartTime, endTime), - * - * which is decided by the message's timestamp t where - * {{{ - * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime) - * endTime = Math.min(upperBound1, upperBound2, checkpointTime) - * lowerBound1 = step * Nmax1 <= t - * lowerBound2 = step * Nmax2 + size <= t - * upperBound1 = step * Nmin1 > t - * upperBound2 = step * Nmin2 + size > t - * }}} - */ - private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = { - val windowSize = window.windowSize - val windowStep = window.windowStep - val lowerBound1 = timestamp / windowStep * windowStep - val lowerBound2 = - if (timestamp < windowSize) 0L - else (timestamp - windowSize) / windowStep * windowStep + windowSize - val upperBound1 = (timestamp / windowStep + 1) * windowStep - val upperBound2 = - if (timestamp < windowSize) windowSize - else ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize - val lowerBound = Math.max(lowerBound1, lowerBound2) - val upperBound = Math.min(upperBound1, upperBound2) - if (checkpointTime > timestamp) { - Interval(Math.max(lowerBound, lastCheckpointTime), Math.min(upperBound, checkpointTime)) - } else { - Interval(Math.max(lowerBound, checkpointTime), upperBound) - } - } - - private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp) - : Unit = { - val interval = getInterval(timestamp, checkpointTime) - intervalStates.get(interval) match { - case Some(st) => - intervalStates += interval -> group.plus(st, t) - case None => - intervalStates += interval -> group.plus(group.zero, t) - } - } - - private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp) - : TreeMap[Interval, T] = { - intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala deleted file mode 100644 index 962f48f..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.storage - -import scala.concurrent._ - -/** - * Generic storage to store KV Data. - */ -trait AppDataStore { - def put(key: String, value: Any): Future[Any] - - def get(key: String): Future[Any] -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala deleted file mode 100644 index f1a19a8..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.storage - -import scala.concurrent.Future - -import akka.actor.ActorRef -import akka.pattern.ask - -import io.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, SaveAppData} -import io.gearpump.util.Constants - -/** - * In memory application storage located on master nodes - */ -class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends AppDataStore { - implicit val timeout = Constants.FUTURE_TIMEOUT - import scala.concurrent.ExecutionContext.Implicits.global - - override def put(key: String, value: Any): Future[Any] = { - master.ask(SaveAppData(appId, key, value)) - } - - override def get(key: String): Future[Any] = { - master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map { result => - if (result.key.equals(key)) { - result.value - } else { - null - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala deleted file mode 100644 index a3cb6e1..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import akka.actor.{ActorRef, ExtendedActorSystem} - -import io.gearpump.Message -import io.gearpump.transport.netty.TaskMessage -import io.gearpump.transport.{Express, HostPort} -import io.gearpump.util.AkkaHelper -/** - * ExpressTransport wire the networking function from default akka - * networking to customized implementation [[io.gearpump.transport.Express]]. - * - * See [[io.gearpump.transport.Express]] for more information. - */ -trait ExpressTransport { - this: TaskActor => - - final val express = Express(context.system) - implicit val system = context.system.asInstanceOf[ExtendedActorSystem] - - final def local: HostPort = express.localHost - lazy val sourceId = TaskId.toLong(taskId) - - lazy val sessionRef: ActorRef = { - AkkaHelper.actorFor(system, s"/session#$sessionId") - } - - def transport(msg: AnyRef, remotes: TaskId*): Unit = { - var serializedMessage: AnyRef = null - - remotes.foreach { remote => - val transportId = TaskId.toLong(remote) - val localActor = express.lookupLocalActor(transportId) - if (localActor.isDefined) { - localActor.get.tell(msg, sessionRef) - } else { - if (null == serializedMessage) { - msg match { - case message: Message => - val bytes = serializerPool.get().serialize(message.msg) - serializedMessage = SerializedMessage(message.timestamp, bytes) - case _ => serializedMessage = msg - } - } - val taskMessage = new TaskMessage(sessionId, transportId, sourceId, serializedMessage) - - val remoteAddress = express.lookupRemoteAddress(transportId) - if (remoteAddress.isDefined) { - express.transport(taskMessage, remoteAddress.get) - } else { - LOG.error( - s"Can not find target task $remote, maybe the application is undergoing recovery") - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala deleted file mode 100644 index 9f9bf1b..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.task - -import java.io.{DataInput, DataOutput} - -import io.gearpump.TimeStamp - -case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte]) - -class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] { - override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length - - override def write(dataOutput: DataOutput, obj: SerializedMessage): Unit = { - dataOutput.writeLong(obj.timeStamp) - dataOutput.writeInt(obj.bytes.length) - dataOutput.write(obj.bytes) - } - - override def read(dataInput: DataInput): SerializedMessage = { - val timestamp = dataInput.readLong() - val length = dataInput.readInt() - val bytes = new Array[Byte](length) - dataInput.readFully(bytes) - SerializedMessage(timestamp, bytes) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala deleted file mode 100644 index 72bc7db..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap} -import io.gearpump.streaming.task.SerializerResolver.Registration - -private[task] class SerializerResolver { - private var classId = 0 - private val idToRegistration = new IntMap[Registration]() - private val classToRegistration = new ObjectMap[Class[_], Registration]() - - def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit = { - val registration = new Registration(classId, clazz, serializer) - idToRegistration.put(classId, registration) - classToRegistration.put(clazz, registration) - classId += 1 - } - - def getRegistration(clazz: Class[_]): Registration = { - classToRegistration.get(clazz) - } - - def getRegistration(clazzId: Int): Registration = { - idToRegistration.get(clazzId) - } -} - -object SerializerResolver { - class Registration(val id: Int, val clazz: Class[_], val serializer: TaskMessageSerializer[_]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala deleted file mode 100644 index 6bc8b15..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import io.gearpump.TimeStamp - -/** Start time of streaming application. All message older than start time will be dropped */ -case class StartTime(startTime: TimeStamp = 0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala deleted file mode 100644 index 17d0b1b..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.task - -import java.io.{DataInput, DataOutput} - -import org.slf4j.Logger - -import io.gearpump.streaming.{AckRequestSerializer, AckSerializer, InitialAckRequestSerializer, LatencyProbeSerializer} -import io.gearpump.transport.netty.ITransportMessageSerializer -import io.gearpump.util.LogUtil - -class StreamingTransportSerializer extends ITransportMessageSerializer { - private val log: Logger = LogUtil.getLogger(getClass) - private val serializers = new SerializerResolver - - serializers.register(classOf[Ack], new AckSerializer) - serializers.register(classOf[AckRequest], new AckRequestSerializer) - serializers.register(classOf[InitialAckRequest], new InitialAckRequestSerializer) - serializers.register(classOf[LatencyProbe], new LatencyProbeSerializer) - serializers.register(classOf[SerializedMessage], new SerializedMessageSerializer) - - override def serialize(dataOutput: DataOutput, obj: Object): Unit = { - val registration = serializers.getRegistration(obj.getClass) - if (registration != null) { - dataOutput.writeInt(registration.id) - registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput, obj) - } else { - log.error(s"Can not find serializer for class type ${obj.getClass}") - } - } - - override def deserialize(dataInput: DataInput, length: Int): Object = { - val classID = dataInput.readInt() - val registration = serializers.getRegistration(classID) - if (registration != null) { - registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput) - } else { - log.error(s"Can not find serializer for class id $classID") - null - } - } - - override def getLength(obj: Object): Int = { - val registration = serializers.getRegistration(obj.getClass) - if (registration != null) { - registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj) + 4 - } else { - log.error(s"Can not find serializer for class type ${obj.getClass}") - 0 - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala deleted file mode 100644 index d20074b..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.{DAG, LifeTime} - -/** - * Each processor can have multiple downstream subscribers. - * - * For example: When processor A subscribe to processor B, then the output of B will be - * pushed to processor A. - * - * @param processorId subscriber processor Id - * @param partitionerDescription subscriber partitioner - */ -case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, - parallelism: Int, lifeTime: LifeTime) - -object Subscriber { - - /** - * - * List subscriptions of a processor. - * The topology information is retrieved from dag - * - * @param processorId the processor to list - * @param dag the DAG - * @return the subscribers of this processor - */ - def of(processorId: Int, dag: DAG): List[Subscriber] = { - val edges = dag.graph.outgoingEdgesOf(processorId) - - edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) => - val (_, partitioner, downstreamProcessorId) = nodeEdgeNode - val downstreamProcessor = dag.processors(downstreamProcessorId) - list :+ Subscriber(downstreamProcessorId, partitioner, - downstreamProcessor.parallelism, downstreamProcessor.life) - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala deleted file mode 100644 index 155edf4..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import org.slf4j.Logger - -import io.gearpump.google.common.primitives.Shorts -import io.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} -import io.gearpump.streaming.AppMasterToExecutor.MsgLostException -import io.gearpump.streaming.LifeTime -import io.gearpump.streaming.task.Subscription._ -import io.gearpump.util.LogUtil -import io.gearpump.{Message, TimeStamp} - -/** - * Manges the output and message clock for single downstream processor - * - * @param subscriber downstream processor - * @param maxPendingMessageCount trigger flow control. Should be bigger than - * maxPendingMessageCountPerAckRequest - * @param ackOnceEveryMessageCount send on AckRequest to the target - */ -class Subscription( - appId: Int, - executorId: Int, - taskId: TaskId, - subscriber: Subscriber, sessionId: Int, - transport: ExpressTransport, - maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT, - ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) { - - assert(maxPendingMessageCount >= ackOnceEveryMessageCount) - assert(maxPendingMessageCount < Short.MaxValue / 2) - - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, - executor = executorId, task = taskId) - - import subscriber.{parallelism, partitionerDescription, processorId} - - // Don't worry if this store negative number. We will wrap the Short - private val messageCount: Array[Short] = new Array[Short](parallelism) - private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) - private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism) - - private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) - private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) - - private var maxPendingCount: Short = 0 - - private var life = subscriber.lifeTime - - private val partitioner = partitionerDescription.partitionerFactory.partitioner - private val sendFn = partitioner match { - case up: UnicastPartitioner => - (msg: Message) => { - val partition = up.getPartition(msg, parallelism, taskId.index) - sendMessage(msg, partition) - } - case mp: MulticastPartitioner => - (msg: Message) => { - val partitions = mp.getPartitions(msg, parallelism, taskId.index) - partitions.map(partition => sendMessage(msg, partition)).sum - } - } - - def changeLife(life: LifeTime): Unit = { - this.life = life - } - - def start(): Unit = { - val ackRequest = InitialAckRequest(taskId, sessionId) - transport.transport(ackRequest, allTasks: _*) - } - - def sendMessage(msg: Message): Int = { - sendFn(msg) - } - - /** - * Returns how many message is actually sent by this subscription - * - * @param msg the message to send - * @param partition the target partition to send message to - * @return 1 if success - */ - def sendMessage(msg: Message, partition: Int): Int = { - - var count = 0 - // Only sends message whose timestamp matches the lifeTime - if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) { - - val targetTask = TaskId(processorId, partition) - transport.transport(msg, targetTask) - - this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp) - this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp) - - incrementMessageCount(partition, 1) - - if (messageCount(partition) % ackOnceEveryMessageCount == 0) { - sendAckRequest(partition) - } - - if (messageCount(partition) / maxPendingMessageCount != - (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) { - sendLatencyProbe(partition) - } - count = 1 - count - } else { - if (needFlush) { - flush() - } - count = 0 - count - } - } - - private var lastFlushTime: Long = 0L - private val FLUSH_INTERVAL = 5 * 1000 // ms - private def needFlush: Boolean = { - System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && - Shorts.max(pendingMessageCount: _*) > 0 - } - - private def flush(): Unit = { - lastFlushTime = System.currentTimeMillis() - allTasks.foreach { targetTaskId => - sendAckRequest(targetTaskId.index) - } - } - - private def allTasks: scala.collection.Seq[TaskId] = { - (0 until parallelism).map { taskIndex => - TaskId(processorId, taskIndex) - } - } - - /** - * Handles acknowledge message. Throw MessageLossException if required. - * - * @param ack acknowledge message received - */ - def receiveAck(ack: Ack): Unit = { - - val index = ack.taskId.index - - if (ack.sessionId == sessionId) { - if (ack.actualReceivedNum == ack.seq) { - if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) { - if (ack.seq == messageCount(index)) { - // All messages have been acked. - minClockValue(index) = Long.MaxValue - } else { - minClockValue(index) = candidateMinClock(index) - } - candidateMinClock(index) = Long.MaxValue - candidateMinClockSince(index) = messageCount(index) - } - - pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort - updateMaxPendingCount() - } else { - LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " + - s"sent: ${ack.seq}, try to replay...") - throw new MsgLostException - } - } - } - - def minClock: TimeStamp = { - minClockValue.min - } - - def allowSendingMoreMessages(): Boolean = { - maxPendingCount < maxPendingMessageCount - } - - def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { - minClockValue.indices.foreach { i => - if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 - && allowSendingMoreMessages) { - sendAckRequest(i) - sendLatencyProbe(i) - } - } - } - - private def sendAckRequest(partition: Int): Unit = { - // Increments more count for each AckRequest - // to throttle the number of unacked AckRequest - incrementMessageCount(partition, ackOnceEveryMessageCount) - val targetTask = TaskId(processorId, partition) - val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) - transport.transport(ackRequest, targetTask) - } - - private def incrementMessageCount(partition: Int, count: Int): Unit = { - messageCount(partition) = (messageCount(partition) + count).toShort - pendingMessageCount(partition) = (pendingMessageCount(partition) + count).toShort - updateMaxPendingCount() - } - - private def updateMaxPendingCount(): Unit = { - maxPendingCount = Shorts.max(pendingMessageCount: _*) - } - - private def sendLatencyProbe(partition: Int): Unit = { - val probeLatency = LatencyProbe(System.currentTimeMillis()) - val targetTask = TaskId(processorId, partition) - transport.transport(probeLatency, targetTask) - } -} - -object Subscription { - // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT - final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100 - final val MAX_PENDING_MESSAGE_COUNT = 1000 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala deleted file mode 100644 index 212a659..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import scala.concurrent.duration.FiniteDuration - -import akka.actor.Actor.Receive -import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.util.LogUtil -import io.gearpump.{Message, TimeStamp} - -/** - * This provides context information for a task. - */ -trait TaskContext { - - def taskId: TaskId - - def executorId: Int - - def appId: Int - - def appName: String - - /** - * The actorRef of AppMaster - * @return application master's actor reference - */ - def appMaster: ActorRef - - /** - * The task parallelism - * - * For example, we can create 3 source tasks, and 3 sink tasks, - * the task parallelism is 3 for each. - * - * This can be useful when reading from partitioned data source. - * For example, for kafka, there may be 10 partitions, if we have - * parallelism of 2 for this task, then each task will be responsible - * to read data from 5 partitions. - * - * @return the parallelism level - */ - def parallelism: Int - - /** - * Please don't use this if possible. - * @return self actor ref - */ - // TODO: We should remove the self from TaskContext - def self: ActorRef - - /** - * Please don't use this if possible - * @return the actor system - */ - // TODO: we should remove this in future - def system: ActorSystem - - /** - * This can be used to output messages to downstream tasks. The data shuffling rule - * can be decided by Partitioner. - * - * @param msg message to output - */ - def output(msg: Message): Unit - - def actorOf(props: Props): ActorRef - - def actorOf(props: Props, name: String): ActorRef - - def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable - - /** - * akka.actor.ActorRefProvider.scheduleOnce - * - * @param initialDelay the initial delay - * @param f the function to execute after initial delay - * @return the executable - */ - def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable - - /** - * For managed message(type of Message), the sender only serve as a unique Id, - * It's address is not something meaningful, you should not use this directly - * - * For unmanaged message, the sender represent the sender ActorRef - * @return sender - */ - def sender: ActorRef - - /** - * Retrieves upstream min clock from TaskActor - * - * @return the min clock - */ - def upstreamMinClock: TimeStamp - - /** - * Logger is environment dependant, it should be provided by - * containing environment. - */ - def logger: Logger -} - -/** - * Streaming Task interface - */ -trait TaskInterface { - - /** - * Method called with the task is initialized. - * @param startTime startTime that can be used to decide from when a source producer task should - * replay the data source, or from when a processor task should recover its - * checkpoint data in to in-memory state. - */ - def onStart(startTime: StartTime): Unit - - /** - * Method called for each message received. - * - * @param msg Message send by upstream tasks - */ - def onNext(msg: Message): Unit - - /** - * Method called when task is under clean up. - * - * This can be used to cleanup resource when the application finished. - */ - def onStop(): Unit - - /** - * Handlers unmanaged messages - * - * @return the handler - */ - def receiveUnManagedMessage: Receive = null -} - -abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface { - - import taskContext.{appId, executorId, taskId} - - val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) - - protected implicit val system = taskContext.system - - implicit val self = taskContext.self - - /** - * For managed message(type of Message), the sender mean nothing, - * you should not use this directory - * - * For unmanaged message, the sender represent the sender actor - * @return the sender - */ - protected def sender: ActorRef = taskContext.sender - - def onStart(startTime: StartTime): Unit = {} - - def onNext(msg: Message): Unit = {} - - def onStop(): Unit = {} - - override def receiveUnManagedMessage: Receive = { - case msg => - LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala deleted file mode 100644 index b9b8829..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import java.util -import java.util.concurrent.TimeUnit - -import akka.actor._ -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap -import io.gearpump.metrics.Metrics -import io.gearpump.serializer.SerializationFramework -import io.gearpump.streaming.AppMasterToExecutor._ -import io.gearpump.streaming.ExecutorToAppMaster._ -import io.gearpump.streaming.{Constants, ProcessorId} -import io.gearpump.util.{LogUtil, TimeOutScheduler} -import io.gearpump.{Message, TimeStamp} - -/** - * - * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task. - */ -class TaskActor( - val taskId: TaskId, - val taskContextData: TaskContextData, - userConf: UserConfig, - val task: TaskWrapper, - inputSerializerPool: SerializationFramework) - extends Actor with ExpressTransport with TimeOutScheduler { - var upstreamMinClock: TimeStamp = 0L - private var _minClock: TimeStamp = 0L - - def serializerPool: SerializationFramework = inputSerializerPool - - import taskContextData._ - - import io.gearpump.streaming.Constants._ - import io.gearpump.streaming.task.TaskActor._ - val config = context.system.settings.config - - val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) - - // Metrics - private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" - private val receiveLatency = Metrics(context.system).histogram( - s"$metricName:receiveLatency", sampleRate = 1) - private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") - private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput") - private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput") - - private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT) - private val ackOnceEveryMessageCount = config.getInt( - GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT) - - private val executor = context.parent - private var life = taskContextData.life - - // Latency probe - import scala.concurrent.duration._ - - import context.dispatcher - final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - - // Clock report interval - final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - - // Flush interval - final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) - - private val queue = new util.LinkedList[AnyRef]() - - private var subscriptions = List.empty[(Int, Subscription)] - - // SecurityChecker will be responsible of dropping messages from - // unknown sources - private val securityChecker = new SecurityChecker(taskId, self) - private[task] var sessionId = NONE_SESSION - - // Reports to appMaster with my address - express.registerLocalActor(TaskId.toLong(taskId), self) - - final def receive: Receive = null - - task.setTaskActor(this) - - def onStart(startTime: StartTime): Unit = { - task.onStart(startTime) - } - - def onNext(msg: Message): Unit = task.onNext(msg) - - def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg) - - def onStop(): Unit = task.onStop() - - /** - * output to a downstream by specifying a arrayIndex - * @param arrayIndex this is not same as ProcessorId - */ - def output(arrayIndex: Int, msg: Message): Unit = { - var count = 0 - count += this.subscriptions(arrayIndex)._2.sendMessage(msg) - sendThroughput.mark(count) - } - - def output(msg: Message): Unit = { - var count = 0 - this.subscriptions.foreach { subscription => - count += subscription._2.sendMessage(msg) - } - sendThroughput.mark(count) - } - - final override def postStop(): Unit = { - onStop() - } - - final override def preStart(): Unit = { - val register = RegisterTask(taskId, executorId, local) - LOG.info(s"$register") - executor ! register - context.become(waitForTaskRegistered) - } - - private def allowSendingMoreMessages(): Boolean = { - subscriptions.forall(_._2.allowSendingMoreMessages()) - } - - private def doHandleMessage(): Unit = { - var done = false - - var count = 0 - val start = System.currentTimeMillis() - - while (allowSendingMoreMessages() && !done) { - val msg = queue.poll() - if (msg != null) { - msg match { - case SendAck(ack, targetTask) => - transport(ack, targetTask) - case m: Message => - count += 1 - onNext(m) - case other => - // un-managed message - onUnManagedMessage(other) - } - } else { - done = true - } - } - - receiveThroughput.mark(count) - if (count > 0) { - processTime.update((System.currentTimeMillis() - start) / count) - } - } - - private def onStartClock(): Unit = { - LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId") - subscriptions = subscribers.map { subscriber => - (subscriber.processorId, - new Subscription(appId, executorId, taskId, subscriber, sessionId, this, - maxPendingMessageCount, ackOnceEveryMessageCount)) - }.sortBy(_._1) - - subscriptions.foreach(_._2.start()) - - import scala.collection.JavaConverters._ - stashQueue.asScala.foreach { item => - handleMessages(item.sender).apply(item.msg) - } - stashQueue.clear() - - // Put this as the last step so that the subscription is already initialized. - // Message sending in current Task before onStart will not be delivered to - // target - onStart(new StartTime(upstreamMinClock)) - - appMaster ! GetUpstreamMinClock(taskId) - context.become(handleMessages(sender)) - } - - def waitForTaskRegistered: Receive = { - case start@TaskRegistered(_, sessionId, startClock) => - this.sessionId = sessionId - this.upstreamMinClock = startClock - context.become(waitForStartClock) - } - - private val stashQueue = new util.LinkedList[MessageAndSender]() - - def waitForStartClock: Receive = { - case start: StartTask => - onStartClock() - case other: AnyRef => - stashQueue.add(MessageAndSender(other, sender())) - } - - def handleMessages(sender: => ActorRef): Receive = { - case ackRequest: InitialAckRequest => - val ackResponse = securityChecker.handleInitialAckRequest(ackRequest) - if (null != ackResponse) { - queue.add(SendAck(ackResponse, ackRequest.taskId)) - doHandleMessage() - } - case ackRequest: AckRequest => - // Enqueue to handle the ackRequest and send back ack later - val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, - ackOnceEveryMessageCount) - if (null != ackResponse) { - queue.add(SendAck(ackResponse, ackRequest.taskId)) - doHandleMessage() - } - case ack: Ack => - subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack)) - doHandleMessage() - case inputMessage: SerializedMessage => - val message = Message(serializerPool.get().deserialize(inputMessage.bytes), - inputMessage.timeStamp) - receiveMessage(message, sender) - case inputMessage: Message => - receiveMessage(inputMessage, sender) - case upstream@UpstreamMinClock(upstreamClock) => - this.upstreamMinClock = upstreamClock - - val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => - val subMin = sub._2.minClock - // A subscription is holding back the _minClock; - // we send AckRequest to its tasks to push _minClock forward - if (subMin == _minClock) { - sub._2.sendAckRequestOnStallingTime(_minClock) - } - Math.min(min, subMin) - } - - _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock)) - - val update = UpdateClock(taskId, _minClock) - context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { - appMaster ! update - } - - // Checks whether current task is dead. - if (_minClock > life.death) { - // There will be no more message received... - val unRegister = UnRegisterTask(taskId, executorId) - executor ! unRegister - - LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life") - } - - case ChangeTask(_, dagVersion, life, subscribers) => - this.life = life - subscribers.foreach { subscriber => - val processorId = subscriber.processorId - val subscription = getSubscription(processorId) - subscription match { - case Some(subscription) => - subscription.changeLife(subscriber.lifeTime cross this.life) - case None => - val subscription = new Subscription(appId, executorId, taskId, subscriber, - sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount) - subscription.start() - subscriptions :+=(subscriber.processorId, subscription) - // Sorting, keep the order - subscriptions = subscriptions.sortBy(_._1) - } - } - sender ! TaskChanged(taskId, dagVersion) - case LatencyProbe(timeStamp) => - receiveLatency.update(System.currentTimeMillis() - timeStamp) - case send: SendMessageLoss => - LOG.info("received SendMessageLoss") - throw new MsgLostException - case other: AnyRef => - queue.add(other) - doHandleMessage() - } - - /** - * Returns min clock of this task - */ - def minClock: TimeStamp = _minClock - - /** - * Returns min clock of upstream task - */ - def getUpstreamMinClock: TimeStamp = upstreamMinClock - - private def receiveMessage(msg: Message, sender: ActorRef): Unit = { - val messageAfterCheck = securityChecker.checkMessage(msg, sender) - messageAfterCheck match { - case Some(msg) => - queue.add(msg) - doHandleMessage() - case None => - // TODO: Indicate the error and avoid the LOG flood - // LOG.error(s"Task $taskId drop message $msg") - } - } - - private def getSubscription(processorId: ProcessorId): Option[Subscription] = { - subscriptions.find(_._1 == processorId).map(_._2) - } -} - -object TaskActor { - // 3 seconds - val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 - - // If the message comes from an unknown source, securityChecker will drop it - class SecurityChecker(task_id: TaskId, self: ActorRef) { - - private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id) - - // Uses mutable HashMap for performance optimization - private val receivedMsgCount = new IntShortHashMap() - - // Tricky performance optimization to save memory. - // We store the session Id in the uid of ActorPath - // ActorPath.hashCode is same as uid. - private def getSessionId(actor: ActorRef): Int = { - // TODO: As method uid is protected in [akka] package. We - // are using hashCode instead of uid. - actor.hashCode() - } - - def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = { - LOG.debug(s"Handle InitialAckRequest for session $ackRequest") - val sessionId = ackRequest.sessionId - if (sessionId == NONE_SESSION) { - LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest") - null - } else { - receivedMsgCount.put(sessionId, 0) - Ack(task_id, 0, 0, sessionId) - } - } - - def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = { - val sessionId = ackRequest.sessionId - if (receivedMsgCount.containsKey(sessionId)) { - // Increments more count for each AckRequest - // to throttle the number of unacked AckRequest - receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort) - Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId) - } else { - LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}") - null - } - } - - // If the message comes from an unknown source, then drop it - def checkMessage(message: Message, sender: ActorRef): Option[Message] = { - if (sender.equals(self)) { - Some(message) - } else { - val sessionId = getSessionId(sender) - if (receivedMsgCount.containsKey(sessionId)) { - receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 1).toShort) - Some(message) - } else { - // This is an illegal message, - LOG.debug(s"received message before receive the first AckRequest, session $sessionId") - None - } - } - } - } - - case class SendAck(ack: Ack, targetTask: TaskId) - - case object FLUSH - - val NONE_SESSION = -1 - - case class MessageAndSender(msg: AnyRef, sender: ActorRef) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala deleted file mode 100644 index 28605cf..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import akka.actor.ActorRef - -import io.gearpump.streaming.LifeTime - -case class TaskContextData( - executorId: Int, - appId: Int, - appName: String, - appMaster: ActorRef, - parallelism: Int, - life: LifeTime, - subscribers: List[Subscriber])
