http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala new file mode 100644 index 0000000..22aa897 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.api + +import scala.util.Try + +trait Serializer[T] extends java.io.Serializable { + def serialize(t: T): Array[Byte] + def deserialize(bytes: Array[Byte]): Try[T] +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala new file mode 100644 index 0000000..82b7952 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.transaction.api.CheckpointStore + +/** Manage physical checkpoints to persitent storage like HDFS */ +class CheckpointManager(checkpointInterval: Long, + checkpointStore: CheckpointStore) { + + private var maxMessageTime: Long = 0L + private var checkpointTime: Option[Long] = None + + def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + checkpointStore.recover(timestamp) + } + + def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { + checkpointStore.persist(timestamp, checkpoint) + checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => + time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval + } + + checkpointTime + } + + def update(messageTime: TimeStamp): Option[TimeStamp] = { + maxMessageTime = Math.max(maxMessageTime, messageTime) + if (checkpointTime.isEmpty) { + checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval) + } + + checkpointTime + } + + def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = { + checkpointTime.exists(time => upstreamMinClock >= time) + } + + def getCheckpointTime: Option[TimeStamp] = checkpointTime + + def close(): Unit = { + checkpointStore.close() + } + + private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala new file mode 100644 index 0000000..753050e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} + +/** + * an in memory store provided for test + * should not be used in real cases + */ +class InMemoryCheckpointStore extends CheckpointStore { + private var checkpoints = Map.empty[TimeStamp, Array[Byte]] + + override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + checkpoints += timestamp -> checkpoint + } + + override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + checkpoints.get(timestamp) + } + + override def close(): Unit = { + checkpoints = Map.empty[TimeStamp, Array[Byte]] + } +} + +class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory { + override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { + new InMemoryCheckpointStore + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala new file mode 100644 index 0000000..b161713 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import org.slf4j.Logger + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer} +import org.apache.gearpump.streaming.state.impl.NonWindowState._ +import org.apache.gearpump.util.LogUtil + +object NonWindowState { + val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]]) +} + +/** + * a MonoidState storing non-window state + */ +class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) + extends MonoidState[T](monoid) { + + override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + serializer.deserialize(bytes).foreach(left = _) + } + + override def update(timestamp: TimeStamp, t: T): Unit = { + updateState(timestamp, t) + } + + override def checkpoint(): Array[Byte] = { + val serialized = serializer.serialize(left) + LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: ($checkpointTime, $left)") + left = monoid.plus(left, right) + right = monoid.zero + serialized + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala new file mode 100644 index 0000000..de2991f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +object PersistentStateConfig { + + val STATE_CHECKPOINT_ENABLE = "state.checkpoint.enable" + val STATE_CHECKPOINT_INTERVAL_MS = "state.checkpoint.interval.ms" + val STATE_CHECKPOINT_STORE_FACTORY = "state.checkpoint.store.factory" + val STATE_WINDOW_SIZE = "state.window.size" + val STATE_WINDOW_STEP = "state.window.step" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala new file mode 100644 index 0000000..c1f647e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.state.impl + +import org.apache.gearpump.TimeStamp + +/** + * Used in window applications + * it keeps the current window and slide ahead when the window expires + */ +class Window(val windowSize: Long, val windowStep: Long) { + + def this(windowConfig: WindowConfig) = { + this(windowConfig.windowSize, windowConfig.windowStep) + } + + private var clock: TimeStamp = 0L + private var startTime = 0L + + def update(clock: TimeStamp): Unit = { + this.clock = clock + } + + def slideOneStep(): Unit = { + startTime += windowStep + } + + def slideTo(timestamp: TimeStamp): Unit = { + startTime = timestamp / windowStep * windowStep + } + + def shouldSlide: Boolean = { + clock >= (startTime + windowSize) + } + + def range: (TimeStamp, TimeStamp) = { + startTime -> (startTime + windowSize) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala new file mode 100644 index 0000000..b053837 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +object WindowConfig { + val NAME = "window_config" +} + +case class WindowConfig(windowSize: Long, windowStep: Long) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala new file mode 100644 index 0000000..348f09e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.impl + +import scala.collection.immutable.TreeMap + +import org.slf4j.Logger + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer} +import org.apache.gearpump.streaming.state.impl.WindowState._ +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil + +/** + * an interval is a dynamic time range that is divided by window boundary and checkpoint time + */ +case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] { + override def compare(that: Interval): Int = { + if (startTime < that.startTime) -1 + else if (startTime > that.startTime) 1 + else 0 + } +} + +object WindowState { + val LOG: Logger = LogUtil.getLogger(classOf[WindowState[_]]) +} + +/** + * this is a list of states, each of which is bounded by a time window + * state of each window doesn't affect each other + * + * WindowState requires a Algebird Group to be passed in + * Group augments Monoid with a minus function which makes it + * possible to undo the update by messages that have left the window + */ +class WindowState[T](group: Group[T], + serializer: Serializer[TreeMap[Interval, T]], + taskContext: TaskContext, + window: Window) extends MonoidState[T](group) { + /** + * each interval has a state updated by message with timestamp in + * [interval.startTime, interval.endTime) + */ + private var intervalStates = TreeMap.empty[Interval, T] + + private var lastCheckpointTime = 0L + + override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + window.slideTo(timestamp) + serializer.deserialize(bytes) + .foreach { states => + intervalStates = states + left = states.foldLeft(left) { case (accum, iter) => + group.plus(accum, iter._2) + } + } + } + + override def update(timestamp: TimeStamp, t: T): Unit = { + val (startTime, endTime) = window.range + if (timestamp >= startTime && timestamp < endTime) { + updateState(timestamp, t) + } + + updateIntervalStates(timestamp, t, checkpointTime) + + val upstreamMinClock = taskContext.upstreamMinClock + window.update(upstreamMinClock) + + if (window.shouldSlide) { + window.slideOneStep() + + val (newStartTime, newEndTime) = window.range + getIntervalStates(startTime, newStartTime).foreach { case (_, st) => + left = group.minus(left, st) + } + if (checkpointTime > endTime) { + getIntervalStates(endTime, checkpointTime).foreach { case (_, st) => + left = group.plus(left, st) + } + } else { + getIntervalStates(endTime, newEndTime).foreach { case (_, st) => + right = group.plus(right, st) + } + } + } + } + + override def checkpoint(): Array[Byte] = { + left = group.plus(left, right) + right = group.zero + + val states = getIntervalStates(window.range._1, checkpointTime) + lastCheckpointTime = checkpointTime + LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: ($checkpointTime, $states)") + serializer.serialize(states) + } + + /** + * Each message will update state in corresponding Interval[StartTime, endTime), + * + * which is decided by the message's timestamp t where + * {{{ + * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime) + * endTime = Math.min(upperBound1, upperBound2, checkpointTime) + * lowerBound1 = step * Nmax1 <= t + * lowerBound2 = step * Nmax2 + size <= t + * upperBound1 = step * Nmin1 > t + * upperBound2 = step * Nmin2 + size > t + * }}} + */ + private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = { + val windowSize = window.windowSize + val windowStep = window.windowStep + val lowerBound1 = timestamp / windowStep * windowStep + val lowerBound2 = + if (timestamp < windowSize) 0L + else (timestamp - windowSize) / windowStep * windowStep + windowSize + val upperBound1 = (timestamp / windowStep + 1) * windowStep + val upperBound2 = + if (timestamp < windowSize) windowSize + else ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize + val lowerBound = Math.max(lowerBound1, lowerBound2) + val upperBound = Math.min(upperBound1, upperBound2) + if (checkpointTime > timestamp) { + Interval(Math.max(lowerBound, lastCheckpointTime), Math.min(upperBound, checkpointTime)) + } else { + Interval(Math.max(lowerBound, checkpointTime), upperBound) + } + } + + private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp) + : Unit = { + val interval = getInterval(timestamp, checkpointTime) + intervalStates.get(interval) match { + case Some(st) => + intervalStates += interval -> group.plus(st, t) + case None => + intervalStates += interval -> group.plus(group.zero, t) + } + } + + private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp) + : TreeMap[Interval, T] = { + intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala new file mode 100644 index 0000000..1db9f6a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.storage + +import scala.concurrent._ + +/** + * Generic storage to store KV Data. + */ +trait AppDataStore { + def put(key: String, value: Any): Future[Any] + + def get(key: String): Future[Any] +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala new file mode 100644 index 0000000..ce98216 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.storage + +import scala.concurrent.Future + +import akka.actor.ActorRef +import akka.pattern.ask + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, SaveAppData} +import org.apache.gearpump.util.Constants + +/** + * In memory application storage located on master nodes + */ +class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends AppDataStore { + implicit val timeout = Constants.FUTURE_TIMEOUT + import scala.concurrent.ExecutionContext.Implicits.global + + override def put(key: String, value: Any): Future[Any] = { + master.ask(SaveAppData(appId, key, value)) + } + + override def get(key: String): Future[Any] = { + master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map { result => + if (result.key.equals(key)) { + result.value + } else { + null + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala new file mode 100644 index 0000000..7629cf9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import akka.actor.{ActorRef, ExtendedActorSystem} + +import org.apache.gearpump.Message +import org.apache.gearpump.transport.netty.TaskMessage +import org.apache.gearpump.transport.{Express, HostPort} +import org.apache.gearpump.util.AkkaHelper +/** + * ExpressTransport wire the networking function from default akka + * networking to customized implementation [[org.apache.gearpump.transport.Express]]. + * + * See [[org.apache.gearpump.transport.Express]] for more information. + */ +trait ExpressTransport { + this: TaskActor => + + final val express = Express(context.system) + implicit val system = context.system.asInstanceOf[ExtendedActorSystem] + + final def local: HostPort = express.localHost + lazy val sourceId = TaskId.toLong(taskId) + + lazy val sessionRef: ActorRef = { + AkkaHelper.actorFor(system, s"/session#$sessionId") + } + + def transport(msg: AnyRef, remotes: TaskId*): Unit = { + var serializedMessage: AnyRef = null + + remotes.foreach { remote => + val transportId = TaskId.toLong(remote) + val localActor = express.lookupLocalActor(transportId) + if (localActor.isDefined) { + localActor.get.tell(msg, sessionRef) + } else { + if (null == serializedMessage) { + msg match { + case message: Message => + val bytes = serializerPool.get().serialize(message.msg) + serializedMessage = SerializedMessage(message.timestamp, bytes) + case _ => serializedMessage = msg + } + } + val taskMessage = new TaskMessage(sessionId, transportId, sourceId, serializedMessage) + + val remoteAddress = express.lookupRemoteAddress(transportId) + if (remoteAddress.isDefined) { + express.transport(taskMessage, remoteAddress.get) + } else { + LOG.error( + s"Can not find target task $remote, maybe the application is undergoing recovery") + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala new file mode 100644 index 0000000..675d5cc --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.task + +import java.io.{DataInput, DataOutput} + +import org.apache.gearpump.TimeStamp + +case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte]) + +class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] { + override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length + + override def write(dataOutput: DataOutput, obj: SerializedMessage): Unit = { + dataOutput.writeLong(obj.timeStamp) + dataOutput.writeInt(obj.bytes.length) + dataOutput.write(obj.bytes) + } + + override def read(dataInput: DataInput): SerializedMessage = { + val timestamp = dataInput.readLong() + val length = dataInput.readInt() + val bytes = new Array[Byte](length) + dataInput.readFully(bytes) + SerializedMessage(timestamp, bytes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala new file mode 100644 index 0000000..4c7867b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap} +import org.apache.gearpump.streaming.task.SerializerResolver.Registration + +private[task] class SerializerResolver { + private var classId = 0 + private val idToRegistration = new IntMap[Registration]() + private val classToRegistration = new ObjectMap[Class[_], Registration]() + + def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit = { + val registration = new Registration(classId, clazz, serializer) + idToRegistration.put(classId, registration) + classToRegistration.put(clazz, registration) + classId += 1 + } + + def getRegistration(clazz: Class[_]): Registration = { + classToRegistration.get(clazz) + } + + def getRegistration(clazzId: Int): Registration = { + idToRegistration.get(clazzId) + } +} + +object SerializerResolver { + class Registration(val id: Int, val clazz: Class[_], val serializer: TaskMessageSerializer[_]) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala new file mode 100644 index 0000000..fb097d3 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import org.apache.gearpump.TimeStamp + +/** Start time of streaming application. All message older than start time will be dropped */ +case class StartTime(startTime: TimeStamp = 0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala new file mode 100644 index 0000000..1bdc175 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.task + +import java.io.{DataInput, DataOutput} + +import org.slf4j.Logger + +import org.apache.gearpump.streaming.{AckRequestSerializer, AckSerializer, InitialAckRequestSerializer, LatencyProbeSerializer} +import org.apache.gearpump.transport.netty.ITransportMessageSerializer +import org.apache.gearpump.util.LogUtil + +class StreamingTransportSerializer extends ITransportMessageSerializer { + private val log: Logger = LogUtil.getLogger(getClass) + private val serializers = new SerializerResolver + + serializers.register(classOf[Ack], new AckSerializer) + serializers.register(classOf[AckRequest], new AckRequestSerializer) + serializers.register(classOf[InitialAckRequest], new InitialAckRequestSerializer) + serializers.register(classOf[LatencyProbe], new LatencyProbeSerializer) + serializers.register(classOf[SerializedMessage], new SerializedMessageSerializer) + + override def serialize(dataOutput: DataOutput, obj: Object): Unit = { + val registration = serializers.getRegistration(obj.getClass) + if (registration != null) { + dataOutput.writeInt(registration.id) + registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput, obj) + } else { + log.error(s"Can not find serializer for class type ${obj.getClass}") + } + } + + override def deserialize(dataInput: DataInput, length: Int): Object = { + val classID = dataInput.readInt() + val registration = serializers.getRegistration(classID) + if (registration != null) { + registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput) + } else { + log.error(s"Can not find serializer for class id $classID") + null + } + } + + override def getLength(obj: Object): Int = { + val registration = serializers.getRegistration(obj.getClass) + if (registration != null) { + registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj) + 4 + } else { + log.error(s"Can not find serializer for class type ${obj.getClass}") + 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala new file mode 100644 index 0000000..692d7f9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.{DAG, LifeTime} + +/** + * Each processor can have multiple downstream subscribers. + * + * For example: When processor A subscribe to processor B, then the output of B will be + * pushed to processor A. + * + * @param processorId subscriber processor Id + * @param partitionerDescription subscriber partitioner + */ +case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, + parallelism: Int, lifeTime: LifeTime) + +object Subscriber { + + /** + * + * List subscriptions of a processor. + * The topology information is retrieved from dag + * + * @param processorId the processor to list + * @param dag the DAG + * @return the subscribers of this processor + */ + def of(processorId: Int, dag: DAG): List[Subscriber] = { + val edges = dag.graph.outgoingEdgesOf(processorId) + + edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) => + val (_, partitioner, downstreamProcessorId) = nodeEdgeNode + val downstreamProcessor = dag.processors(downstreamProcessorId) + list :+ Subscriber(downstreamProcessorId, partitioner, + downstreamProcessor.parallelism, downstreamProcessor.life) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala new file mode 100644 index 0000000..9accae2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import org.slf4j.Logger + +import io.gearpump.google.common.primitives.Shorts +import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} +import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException +import org.apache.gearpump.streaming.LifeTime +import org.apache.gearpump.streaming.task.Subscription._ +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +/** + * Manges the output and message clock for single downstream processor + * + * @param subscriber downstream processor + * @param maxPendingMessageCount trigger flow control. Should be bigger than + * maxPendingMessageCountPerAckRequest + * @param ackOnceEveryMessageCount send on AckRequest to the target + */ +class Subscription( + appId: Int, + executorId: Int, + taskId: TaskId, + subscriber: Subscriber, sessionId: Int, + transport: ExpressTransport, + maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT, + ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) { + + assert(maxPendingMessageCount >= ackOnceEveryMessageCount) + assert(maxPendingMessageCount < Short.MaxValue / 2) + + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, + executor = executorId, task = taskId) + + import subscriber.{parallelism, partitionerDescription, processorId} + + // Don't worry if this store negative number. We will wrap the Short + private val messageCount: Array[Short] = new Array[Short](parallelism) + private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) + private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism) + + private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) + private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) + + private var maxPendingCount: Short = 0 + + private var life = subscriber.lifeTime + + private val partitioner = partitionerDescription.partitionerFactory.partitioner + private val sendFn = partitioner match { + case up: UnicastPartitioner => + (msg: Message) => { + val partition = up.getPartition(msg, parallelism, taskId.index) + sendMessage(msg, partition) + } + case mp: MulticastPartitioner => + (msg: Message) => { + val partitions = mp.getPartitions(msg, parallelism, taskId.index) + partitions.map(partition => sendMessage(msg, partition)).sum + } + } + + def changeLife(life: LifeTime): Unit = { + this.life = life + } + + def start(): Unit = { + val ackRequest = InitialAckRequest(taskId, sessionId) + transport.transport(ackRequest, allTasks: _*) + } + + def sendMessage(msg: Message): Int = { + sendFn(msg) + } + + /** + * Returns how many message is actually sent by this subscription + * + * @param msg the message to send + * @param partition the target partition to send message to + * @return 1 if success + */ + def sendMessage(msg: Message, partition: Int): Int = { + + var count = 0 + // Only sends message whose timestamp matches the lifeTime + if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) { + + val targetTask = TaskId(processorId, partition) + transport.transport(msg, targetTask) + + this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp) + this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp) + + incrementMessageCount(partition, 1) + + if (messageCount(partition) % ackOnceEveryMessageCount == 0) { + sendAckRequest(partition) + } + + if (messageCount(partition) / maxPendingMessageCount != + (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) { + sendLatencyProbe(partition) + } + count = 1 + count + } else { + if (needFlush) { + flush() + } + count = 0 + count + } + } + + private var lastFlushTime: Long = 0L + private val FLUSH_INTERVAL = 5 * 1000 // ms + private def needFlush: Boolean = { + System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && + Shorts.max(pendingMessageCount: _*) > 0 + } + + private def flush(): Unit = { + lastFlushTime = System.currentTimeMillis() + allTasks.foreach { targetTaskId => + sendAckRequest(targetTaskId.index) + } + } + + private def allTasks: scala.collection.Seq[TaskId] = { + (0 until parallelism).map { taskIndex => + TaskId(processorId, taskIndex) + } + } + + /** + * Handles acknowledge message. Throw MessageLossException if required. + * + * @param ack acknowledge message received + */ + def receiveAck(ack: Ack): Unit = { + + val index = ack.taskId.index + + if (ack.sessionId == sessionId) { + if (ack.actualReceivedNum == ack.seq) { + if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) { + if (ack.seq == messageCount(index)) { + // All messages have been acked. + minClockValue(index) = Long.MaxValue + } else { + minClockValue(index) = candidateMinClock(index) + } + candidateMinClock(index) = Long.MaxValue + candidateMinClockSince(index) = messageCount(index) + } + + pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort + updateMaxPendingCount() + } else { + LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " + + s"sent: ${ack.seq}, try to replay...") + throw new MsgLostException + } + } + } + + def minClock: TimeStamp = { + minClockValue.min + } + + def allowSendingMoreMessages(): Boolean = { + maxPendingCount < maxPendingMessageCount + } + + def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { + minClockValue.indices.foreach { i => + if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 + && allowSendingMoreMessages) { + sendAckRequest(i) + sendLatencyProbe(i) + } + } + } + + private def sendAckRequest(partition: Int): Unit = { + // Increments more count for each AckRequest + // to throttle the number of unacked AckRequest + incrementMessageCount(partition, ackOnceEveryMessageCount) + val targetTask = TaskId(processorId, partition) + val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) + transport.transport(ackRequest, targetTask) + } + + private def incrementMessageCount(partition: Int, count: Int): Unit = { + messageCount(partition) = (messageCount(partition) + count).toShort + pendingMessageCount(partition) = (pendingMessageCount(partition) + count).toShort + updateMaxPendingCount() + } + + private def updateMaxPendingCount(): Unit = { + maxPendingCount = Shorts.max(pendingMessageCount: _*) + } + + private def sendLatencyProbe(partition: Int): Unit = { + val probeLatency = LatencyProbe(System.currentTimeMillis()) + val targetTask = TaskId(processorId, partition) + transport.transport(probeLatency, targetTask) + } +} + +object Subscription { + // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT + final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100 + final val MAX_PENDING_MESSAGE_COUNT = 1000 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala new file mode 100644 index 0000000..9c76a40 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Actor.Receive +import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +/** + * This provides context information for a task. + */ +trait TaskContext { + + def taskId: TaskId + + def executorId: Int + + def appId: Int + + def appName: String + + /** + * The actorRef of AppMaster + * @return application master's actor reference + */ + def appMaster: ActorRef + + /** + * The task parallelism + * + * For example, we can create 3 source tasks, and 3 sink tasks, + * the task parallelism is 3 for each. + * + * This can be useful when reading from partitioned data source. + * For example, for kafka, there may be 10 partitions, if we have + * parallelism of 2 for this task, then each task will be responsible + * to read data from 5 partitions. + * + * @return the parallelism level + */ + def parallelism: Int + + /** + * Please don't use this if possible. + * @return self actor ref + */ + // TODO: We should remove the self from TaskContext + def self: ActorRef + + /** + * Please don't use this if possible + * @return the actor system + */ + // TODO: we should remove this in future + def system: ActorSystem + + /** + * This can be used to output messages to downstream tasks. The data shuffling rule + * can be decided by Partitioner. + * + * @param msg message to output + */ + def output(msg: Message): Unit + + def actorOf(props: Props): ActorRef + + def actorOf(props: Props, name: String): ActorRef + + def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable + + /** + * akka.actor.ActorRefProvider.scheduleOnce + * + * @param initialDelay the initial delay + * @param f the function to execute after initial delay + * @return the executable + */ + def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable + + /** + * For managed message(type of Message), the sender only serve as a unique Id, + * It's address is not something meaningful, you should not use this directly + * + * For unmanaged message, the sender represent the sender ActorRef + * @return sender + */ + def sender: ActorRef + + /** + * Retrieves upstream min clock from TaskActor + * + * @return the min clock + */ + def upstreamMinClock: TimeStamp + + /** + * Logger is environment dependant, it should be provided by + * containing environment. + */ + def logger: Logger +} + +/** + * Streaming Task interface + */ +trait TaskInterface { + + /** + * Method called with the task is initialized. + * @param startTime startTime that can be used to decide from when a source producer task should + * replay the data source, or from when a processor task should recover its + * checkpoint data in to in-memory state. + */ + def onStart(startTime: StartTime): Unit + + /** + * Method called for each message received. + * + * @param msg Message send by upstream tasks + */ + def onNext(msg: Message): Unit + + /** + * Method called when task is under clean up. + * + * This can be used to cleanup resource when the application finished. + */ + def onStop(): Unit + + /** + * Handlers unmanaged messages + * + * @return the handler + */ + def receiveUnManagedMessage: Receive = null +} + +abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface { + + import taskContext.{appId, executorId, taskId} + + val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) + + protected implicit val system = taskContext.system + + implicit val self = taskContext.self + + /** + * For managed message(type of Message), the sender mean nothing, + * you should not use this directory + * + * For unmanaged message, the sender represent the sender actor + * @return the sender + */ + protected def sender: ActorRef = taskContext.sender + + def onStart(startTime: StartTime): Unit = {} + + def onNext(msg: Message): Unit = {} + + def onStop(): Unit = {} + + override def receiveUnManagedMessage: Receive = { + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala new file mode 100644 index 0000000..14742d0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import java.util +import java.util.concurrent.TimeUnit + +import akka.actor._ +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap +import org.apache.gearpump.metrics.Metrics +import org.apache.gearpump.serializer.SerializationFramework +import org.apache.gearpump.streaming.AppMasterToExecutor._ +import org.apache.gearpump.streaming.ExecutorToAppMaster._ +import org.apache.gearpump.streaming.{Constants, ProcessorId} +import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} +import org.apache.gearpump.{Message, TimeStamp} + +/** + * + * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task. + */ +class TaskActor( + val taskId: TaskId, + val taskContextData: TaskContextData, + userConf: UserConfig, + val task: TaskWrapper, + inputSerializerPool: SerializationFramework) + extends Actor with ExpressTransport with TimeOutScheduler { + var upstreamMinClock: TimeStamp = 0L + private var _minClock: TimeStamp = 0L + + def serializerPool: SerializationFramework = inputSerializerPool + + import taskContextData._ + + import org.apache.gearpump.streaming.Constants._ + import org.apache.gearpump.streaming.task.TaskActor._ + val config = context.system.settings.config + + val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) + + // Metrics + private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" + private val receiveLatency = Metrics(context.system).histogram( + s"$metricName:receiveLatency", sampleRate = 1) + private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") + private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput") + private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput") + + private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT) + private val ackOnceEveryMessageCount = config.getInt( + GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT) + + private val executor = context.parent + private var life = taskContextData.life + + // Latency probe + import scala.concurrent.duration._ + + import context.dispatcher + final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) + + // Clock report interval + final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) + + // Flush interval + final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) + + private val queue = new util.LinkedList[AnyRef]() + + private var subscriptions = List.empty[(Int, Subscription)] + + // SecurityChecker will be responsible of dropping messages from + // unknown sources + private val securityChecker = new SecurityChecker(taskId, self) + private[task] var sessionId = NONE_SESSION + + // Reports to appMaster with my address + express.registerLocalActor(TaskId.toLong(taskId), self) + + final def receive: Receive = null + + task.setTaskActor(this) + + def onStart(startTime: StartTime): Unit = { + task.onStart(startTime) + } + + def onNext(msg: Message): Unit = task.onNext(msg) + + def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg) + + def onStop(): Unit = task.onStop() + + /** + * output to a downstream by specifying a arrayIndex + * @param arrayIndex this is not same as ProcessorId + */ + def output(arrayIndex: Int, msg: Message): Unit = { + var count = 0 + count += this.subscriptions(arrayIndex)._2.sendMessage(msg) + sendThroughput.mark(count) + } + + def output(msg: Message): Unit = { + var count = 0 + this.subscriptions.foreach { subscription => + count += subscription._2.sendMessage(msg) + } + sendThroughput.mark(count) + } + + final override def postStop(): Unit = { + onStop() + } + + final override def preStart(): Unit = { + val register = RegisterTask(taskId, executorId, local) + LOG.info(s"$register") + executor ! register + context.become(waitForTaskRegistered) + } + + private def allowSendingMoreMessages(): Boolean = { + subscriptions.forall(_._2.allowSendingMoreMessages()) + } + + private def doHandleMessage(): Unit = { + var done = false + + var count = 0 + val start = System.currentTimeMillis() + + while (allowSendingMoreMessages() && !done) { + val msg = queue.poll() + if (msg != null) { + msg match { + case SendAck(ack, targetTask) => + transport(ack, targetTask) + case m: Message => + count += 1 + onNext(m) + case other => + // un-managed message + onUnManagedMessage(other) + } + } else { + done = true + } + } + + receiveThroughput.mark(count) + if (count > 0) { + processTime.update((System.currentTimeMillis() - start) / count) + } + } + + private def onStartClock(): Unit = { + LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId") + subscriptions = subscribers.map { subscriber => + (subscriber.processorId, + new Subscription(appId, executorId, taskId, subscriber, sessionId, this, + maxPendingMessageCount, ackOnceEveryMessageCount)) + }.sortBy(_._1) + + subscriptions.foreach(_._2.start()) + + import scala.collection.JavaConverters._ + stashQueue.asScala.foreach { item => + handleMessages(item.sender).apply(item.msg) + } + stashQueue.clear() + + // Put this as the last step so that the subscription is already initialized. + // Message sending in current Task before onStart will not be delivered to + // target + onStart(new StartTime(upstreamMinClock)) + + appMaster ! GetUpstreamMinClock(taskId) + context.become(handleMessages(sender)) + } + + def waitForTaskRegistered: Receive = { + case start@TaskRegistered(_, sessionId, startClock) => + this.sessionId = sessionId + this.upstreamMinClock = startClock + context.become(waitForStartClock) + } + + private val stashQueue = new util.LinkedList[MessageAndSender]() + + def waitForStartClock: Receive = { + case start: StartTask => + onStartClock() + case other: AnyRef => + stashQueue.add(MessageAndSender(other, sender())) + } + + def handleMessages(sender: => ActorRef): Receive = { + case ackRequest: InitialAckRequest => + val ackResponse = securityChecker.handleInitialAckRequest(ackRequest) + if (null != ackResponse) { + queue.add(SendAck(ackResponse, ackRequest.taskId)) + doHandleMessage() + } + case ackRequest: AckRequest => + // Enqueue to handle the ackRequest and send back ack later + val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, + ackOnceEveryMessageCount) + if (null != ackResponse) { + queue.add(SendAck(ackResponse, ackRequest.taskId)) + doHandleMessage() + } + case ack: Ack => + subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack)) + doHandleMessage() + case inputMessage: SerializedMessage => + val message = Message(serializerPool.get().deserialize(inputMessage.bytes), + inputMessage.timeStamp) + receiveMessage(message, sender) + case inputMessage: Message => + receiveMessage(inputMessage, sender) + case upstream@UpstreamMinClock(upstreamClock) => + this.upstreamMinClock = upstreamClock + + val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => + val subMin = sub._2.minClock + // A subscription is holding back the _minClock; + // we send AckRequest to its tasks to push _minClock forward + if (subMin == _minClock) { + sub._2.sendAckRequestOnStallingTime(_minClock) + } + Math.min(min, subMin) + } + + _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock)) + + val update = UpdateClock(taskId, _minClock) + context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { + appMaster ! update + } + + // Checks whether current task is dead. + if (_minClock > life.death) { + // There will be no more message received... + val unRegister = UnRegisterTask(taskId, executorId) + executor ! unRegister + + LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life") + } + + case ChangeTask(_, dagVersion, life, subscribers) => + this.life = life + subscribers.foreach { subscriber => + val processorId = subscriber.processorId + val subscription = getSubscription(processorId) + subscription match { + case Some(subscription) => + subscription.changeLife(subscriber.lifeTime cross this.life) + case None => + val subscription = new Subscription(appId, executorId, taskId, subscriber, + sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount) + subscription.start() + subscriptions :+=(subscriber.processorId, subscription) + // Sorting, keep the order + subscriptions = subscriptions.sortBy(_._1) + } + } + sender ! TaskChanged(taskId, dagVersion) + case LatencyProbe(timeStamp) => + receiveLatency.update(System.currentTimeMillis() - timeStamp) + case send: SendMessageLoss => + LOG.info("received SendMessageLoss") + throw new MsgLostException + case other: AnyRef => + queue.add(other) + doHandleMessage() + } + + /** + * Returns min clock of this task + */ + def minClock: TimeStamp = _minClock + + /** + * Returns min clock of upstream task + */ + def getUpstreamMinClock: TimeStamp = upstreamMinClock + + private def receiveMessage(msg: Message, sender: ActorRef): Unit = { + val messageAfterCheck = securityChecker.checkMessage(msg, sender) + messageAfterCheck match { + case Some(msg) => + queue.add(msg) + doHandleMessage() + case None => + // TODO: Indicate the error and avoid the LOG flood + // LOG.error(s"Task $taskId drop message $msg") + } + } + + private def getSubscription(processorId: ProcessorId): Option[Subscription] = { + subscriptions.find(_._1 == processorId).map(_._2) + } +} + +object TaskActor { + // 3 seconds + val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 + + // If the message comes from an unknown source, securityChecker will drop it + class SecurityChecker(task_id: TaskId, self: ActorRef) { + + private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id) + + // Uses mutable HashMap for performance optimization + private val receivedMsgCount = new IntShortHashMap() + + // Tricky performance optimization to save memory. + // We store the session Id in the uid of ActorPath + // ActorPath.hashCode is same as uid. + private def getSessionId(actor: ActorRef): Int = { + // TODO: As method uid is protected in [akka] package. We + // are using hashCode instead of uid. + actor.hashCode() + } + + def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = { + LOG.debug(s"Handle InitialAckRequest for session $ackRequest") + val sessionId = ackRequest.sessionId + if (sessionId == NONE_SESSION) { + LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest") + null + } else { + receivedMsgCount.put(sessionId, 0) + Ack(task_id, 0, 0, sessionId) + } + } + + def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = { + val sessionId = ackRequest.sessionId + if (receivedMsgCount.containsKey(sessionId)) { + // Increments more count for each AckRequest + // to throttle the number of unacked AckRequest + receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort) + Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId) + } else { + LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}") + null + } + } + + // If the message comes from an unknown source, then drop it + def checkMessage(message: Message, sender: ActorRef): Option[Message] = { + if (sender.equals(self)) { + Some(message) + } else { + val sessionId = getSessionId(sender) + if (receivedMsgCount.containsKey(sessionId)) { + receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 1).toShort) + Some(message) + } else { + // This is an illegal message, + LOG.debug(s"received message before receive the first AckRequest, session $sessionId") + None + } + } + } + } + + case class SendAck(ack: Ack, targetTask: TaskId) + + case object FLUSH + + val NONE_SESSION = -1 + + case class MessageAndSender(msg: AnyRef, sender: ActorRef) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala new file mode 100644 index 0000000..0d12bc9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import akka.actor.ActorRef + +import org.apache.gearpump.streaming.LifeTime + +case class TaskContextData( + executorId: Int, + appId: Int, + appName: String, + appMaster: ActorRef, + parallelism: Int, + life: LifeTime, + subscribers: List[Subscriber]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala new file mode 100644 index 0000000..e537f99 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.ProcessorId + +/* + * Initial AckRequest + */ +case class InitialAckRequest(taskId: TaskId, sessionId: Int) + +/* + Here the sessionId filed is used to distinguish messages + between different replays after the application restart + */ +case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int) + +/** + * Ack back to sender task actor. + * + * @param seq The seq field represents the expected number of received messages and the + * actualReceivedNum field means the actual received number since start. + */ +case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int) + +sealed trait ClockEvent + +case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent + +object GetLatestMinClock extends ClockEvent + +case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent + +case class UpstreamMinClock(latestMinClock: TimeStamp) + +case class LatestMinClock(clock: TimeStamp) + +case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp) + +case object GetCheckpointClock + +case class CheckpointClock(clock: Option[TimeStamp]) + +case object GetStartClock + +case class StartClock(clock: TimeStamp) + +/** Probe the latency between two upstream to downstream tasks. */ +case class LatencyProbe(timestamp: Long) + +case class SendMessageLoss() + +case object GetDAG + +case class CheckProcessorDeath(processorId: ProcessorId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala new file mode 100644 index 0000000..b44293c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import org.apache.gearpump.streaming._ + +case class TaskId(processorId: ProcessorId, index: TaskIndex) + +object TaskId { + def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index + def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala new file mode 100644 index 0000000..591dd46 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.task + +import java.io.{DataInput, DataOutput} + +trait TaskMessageSerializer[T] { + def write(dataOutput: DataOutput, obj: T) + + def read(dataInput: DataInput): T + + def getLength(obj: T): Int +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala new file mode 100644 index 0000000..7459c64 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +object TaskUtil { + + /** + * Resolves a classname to a Task class. + * + * @param className the class name to resolve + * @return resolved class + */ + def loadClass(className: String): Class[_ <: Task] = { + val loader = Thread.currentThread().getContextClassLoader() + loader.loadClass(className).asSubclass(classOf[Task]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala new file mode 100644 index 0000000..cd33f7e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Actor._ +import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +/** + * This provides TaskContext for user defined tasks + * + * @param taskClass task class + * @param context context class + * @param userConf user config + */ +class TaskWrapper( + val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, + userConf: UserConfig) extends TaskContext with TaskInterface { + + private val LOG = LogUtil.getLogger(taskClass, task = taskId) + + private var actor: TaskActor = null + + private var task: Option[Task] = None + + def setTaskActor(actor: TaskActor): Unit = this.actor = actor + + override def appId: Int = context.appId + + override def appName: String = context.appName + + override def executorId: Int = context.executorId + + override def parallelism: Int = context.parallelism + + override def appMaster: ActorRef = context.appMaster + + override def output(msg: Message): Unit = actor.output(msg) + + /** + * See [[org.apache.gearpump.streaming.task.TaskActor]] + * output(arrayIndex: Int, msg: Message): Unit + * + * @param index, not same as ProcessorId + */ + def output(index: Int, msg: Message): Unit = actor.output(index, msg) + + /** + * Use with caution, output unmanaged message to target tasks + * + * @param msg message to output + * @param tasks the tasks to output to + */ + def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = { + actor.transport(msg, tasks: _*) + } + + override def self: ActorRef = actor.context.self + + override def sender: ActorRef = actor.context.sender() + + def system: ActorSystem = actor.context.system + + override def actorOf(props: Props): ActorRef = actor.context.actorOf(props) + + override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name) + + override def onStart(startTime: StartTime): Unit = { + if (None != task) { + LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}") + } + val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig]) + task = Some(constructor.newInstance(this, userConf)) + task.foreach(_.onStart(startTime)) + } + + override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg)) + + override def onStop(): Unit = { + task.foreach(_.onStop()) + task = None + } + + override def receiveUnManagedMessage: Receive = { + task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler) + } + + override def upstreamMinClock: TimeStamp = { + actor.getUpstreamMinClock + } + + def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = { + val dispatcher = actor.context.system.dispatcher + actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher) + } + + def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = { + val dispatcher = actor.context.system.dispatcher + actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher) + } + + private def defaultMessageHandler: Receive = { + case msg => + LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) + } + + /** + * Logger is environment dependant, it should be provided by + * containing environment. + */ + override def logger: Logger = LOG +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala new file mode 100644 index 0000000..2ef8610 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +/** + * CheckpointStore persistently stores mapping of timestamp to checkpoint + * it's possible that two checkpoints have the same timestamp + * CheckpointStore needs to handle this either during write or read + */ +trait CheckpointStore { + + def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit + + def recover(timestamp: TimeStamp): Option[Array[Byte]] + + def close(): Unit +} + +trait CheckpointStoreFactory extends java.io.Serializable { + def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala new file mode 100644 index 0000000..0615078 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.Message + +/** + * MessageDecoder decodes raw bytes to Message It is usually written by end user and + * passed into TimeReplayableSource + */ +trait MessageDecoder extends java.io.Serializable { + def fromBytes(bytes: Array[Byte]): Message +}
