http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala deleted file mode 100644 index d89387a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import io.gearpump.TimeStamp -import io.gearpump.streaming.ProcessorId - -/* - * Initial AckRequest - */ -case class InitialAckRequest(taskId: TaskId, sessionId: Int) - -/* - Here the sessionId filed is used to distinguish messages - between different replays after the application restart - */ -case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int) - -/** - * Ack back to sender task actor. - * - * @param seq The seq field represents the expected number of received messages and the - * actualReceivedNum field means the actual received number since start. - */ -case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int) - -sealed trait ClockEvent - -case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent - -object GetLatestMinClock extends ClockEvent - -case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent - -case class UpstreamMinClock(latestMinClock: TimeStamp) - -case class LatestMinClock(clock: TimeStamp) - -case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp) - -case object GetCheckpointClock - -case class CheckpointClock(clock: Option[TimeStamp]) - -case object GetStartClock - -case class StartClock(clock: TimeStamp) - -/** Probe the latency between two upstream to downstream tasks. */ -case class LatencyProbe(timestamp: Long) - -case class SendMessageLoss() - -case object GetDAG - -case class CheckProcessorDeath(processorId: ProcessorId)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala deleted file mode 100644 index 66c3c52..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import io.gearpump.streaming._ - -case class TaskId(processorId: ProcessorId, index: TaskIndex) - -object TaskId { - def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index - def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala deleted file mode 100644 index 500f8b3..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.task - -import java.io.{DataInput, DataOutput} - -trait TaskMessageSerializer[T] { - def write(dataOutput: DataOutput, obj: T) - - def read(dataInput: DataInput): T - - def getLength(obj: T): Int -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala deleted file mode 100644 index 040fc2e..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -object TaskUtil { - - /** - * Resolves a classname to a Task class. - * - * @param className the class name to resolve - * @return resolved class - */ - def loadClass(className: String): Class[_ <: Task] = { - val loader = Thread.currentThread().getContextClassLoader() - loader.loadClass(className).asSubclass(classOf[Task]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala deleted file mode 100644 index e7e883c..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.task - -import scala.concurrent.duration.FiniteDuration - -import akka.actor.Actor._ -import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.util.LogUtil -import io.gearpump.{Message, TimeStamp} - -/** - * This provides TaskContext for user defined tasks - * - * @param taskClass task class - * @param context context class - * @param userConf user config - */ -class TaskWrapper( - val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, - userConf: UserConfig) extends TaskContext with TaskInterface { - - private val LOG = LogUtil.getLogger(taskClass, task = taskId) - - private var actor: TaskActor = null - - private var task: Option[Task] = None - - def setTaskActor(actor: TaskActor): Unit = this.actor = actor - - override def appId: Int = context.appId - - override def appName: String = context.appName - - override def executorId: Int = context.executorId - - override def parallelism: Int = context.parallelism - - override def appMaster: ActorRef = context.appMaster - - override def output(msg: Message): Unit = actor.output(msg) - - /** - * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit - * - * @param index, not same as ProcessorId - */ - def output(index: Int, msg: Message): Unit = actor.output(index, msg) - - /** - * Use with caution, output unmanaged message to target tasks - * - * @param msg message to output - * @param tasks the tasks to output to - */ - def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = { - actor.transport(msg, tasks: _*) - } - - override def self: ActorRef = actor.context.self - - override def sender: ActorRef = actor.context.sender() - - def system: ActorSystem = actor.context.system - - override def actorOf(props: Props): ActorRef = actor.context.actorOf(props) - - override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name) - - override def onStart(startTime: StartTime): Unit = { - if (None != task) { - LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}") - } - val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig]) - task = Some(constructor.newInstance(this, userConf)) - task.foreach(_.onStart(startTime)) - } - - override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg)) - - override def onStop(): Unit = { - task.foreach(_.onStop()) - task = None - } - - override def receiveUnManagedMessage: Receive = { - task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler) - } - - override def upstreamMinClock: TimeStamp = { - actor.getUpstreamMinClock - } - - def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = { - val dispatcher = actor.context.system.dispatcher - actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher) - } - - def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = { - val dispatcher = actor.context.system.dispatcher - actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher) - } - - private def defaultMessageHandler: Receive = { - case msg => - LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) - } - - /** - * Logger is environment dependant, it should be provided by - * containing environment. - */ - override def logger: Logger = LOG -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala deleted file mode 100644 index f3894ea..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import io.gearpump.TimeStamp -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext - -/** - * CheckpointStore persistently stores mapping of timestamp to checkpoint - * it's possible that two checkpoints have the same timestamp - * CheckpointStore needs to handle this either during write or read - */ -trait CheckpointStore { - - def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit - - def recover(timestamp: TimeStamp): Option[Array[Byte]] - - def close(): Unit -} - -trait CheckpointStoreFactory extends java.io.Serializable { - def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala deleted file mode 100644 index 7039b71..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import io.gearpump.Message - -/** - * MessageDecoder decodes raw bytes to Message It is usually written by end user and - * passed into TimeReplayableSource - */ -trait MessageDecoder extends java.io.Serializable { - def fromBytes(bytes: Array[Byte]): Message -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala deleted file mode 100644 index 412ddcc..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import scala.util.Try - -import io.gearpump.{Message, TimeStamp} - -/** - * Filters offsets and store the mapping from timestamp to offset - */ -trait MessageFilter { - def filter(messageAndOffset: (Message, Long)): Option[Message] -} - -/** - * Resolves timestamp to offset by look up the underlying storage - */ -trait OffsetTimeStampResolver { - def resolveOffset(time: TimeStamp): Try[Long] -} - -/** - * Manages message's offset on TimeReplayableSource and timestamp - */ -trait OffsetManager extends MessageFilter with OffsetTimeStampResolver { - def close(): Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala deleted file mode 100644 index fa7161c..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import scala.util.Try - -import io.gearpump.TimeStamp - -object OffsetStorage { - - /** - * StorageEmpty means no data has been stored - */ - case object StorageEmpty extends Throwable - - /** - * Overflow means the looked up time is - * larger than the maximum stored TimeStamp - */ - case class Overflow(maxTimestamp: Array[Byte]) extends Throwable - - /** - * Underflow means the looked up time is - * smaller than the minimum stored TimeStamp - */ - case class Underflow(minTimestamp: Array[Byte]) extends Throwable -} - -/** - * OffsetStorage stores the mapping from TimeStamp to Offset - */ -trait OffsetStorage { - /** - * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is - * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow, - * Underflow) - * - * @param time the time to look for - * @return the corresponding offset if the time is in the range, otherwise failure - */ - def lookUp(time: TimeStamp): Try[Array[Byte]] - - def append(time: TimeStamp, offset: Array[Byte]): Unit - - def close(): Unit -} - -trait OffsetStorageFactory extends java.io.Serializable { - def getOffsetStorage(dir: String): OffsetStorage -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala deleted file mode 100644 index 50711ee..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import io.gearpump.streaming.source.DataSource - -/** - * AT-LEAST-ONCE API. Represents a data source which allow replaying. - * - * Subclass should be able to replay messages on recovery from the time - * when an application crashed. - */ -trait TimeReplayableSource extends DataSource - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala deleted file mode 100644 index 7c34e1a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.transaction.api - -import io.gearpump.{Message, TimeStamp} - -/** - * TimeStampFilter filters out messages that are obsolete. - */ -trait TimeStampFilter extends java.io.Serializable { - def filter(msg: Message, predicate: TimeStamp): Option[Message] -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala deleted file mode 100644 index c2ac32a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.util - -import akka.actor.{ActorPath, ActorRef} - -import io.gearpump.streaming.task.TaskId - -object ActorPathUtil { - - def executorActorName(executorId: Int): String = executorId.toString - - def taskActorName(taskId: TaskId): String = { - s"processor_${taskId.processorId}_task_${taskId.index}" - } - - def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = { - val executorManager = appMaster.path.child(executorManagerActorName) - val executor = executorManager.child(executorActorName(executorId)) - val task = executor.child(taskActorName(taskId)) - task - } - - def executorManagerActorName: String = "executors" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala new file mode 100644 index 0000000..4b801a2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming + +import scala.language.existentials + +import akka.actor.ActorRef + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations +import org.apache.gearpump.streaming.task.{Subscriber, TaskId} +import org.apache.gearpump.transport.HostPort + +object AppMasterToExecutor { + case class LaunchTasks( + taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, + subscribers: List[Subscriber]) + + case object TasksLaunched + + /** + * dagVersion, life, and subscribers will be changed on target task list. + */ + case class ChangeTasks( + taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) + + case class TasksChanged(taskIds: List[TaskId]) + + case class ChangeTask( + taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) + + case class TaskChanged(taskId: TaskId, dagVersion: Int) + + case class StartTask(taskId: TaskId) + + case class StopTask(taskId: TaskId) + + case class TaskLocationsReady(taskLocations: TaskLocations, dagVersion: Int) + + case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId) + + case class TaskLocationsRejected( + dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable) + + case class StartAllTasks(dagVersion: Int) + + case class StartDynamicDag(dagVersion: Int) + case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp) + case class TaskRejected(taskId: TaskId) + + case object RestartClockService + class MsgLostException extends Exception +} + +object ExecutorToAppMaster { + case class RegisterExecutor( + executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo) + + case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort) + case class UnRegisterTask(taskId: TaskId, executorId: Int) + + case class MessageLoss(executorId: Int, taskId: TaskId, cause: String) +} + +object AppMasterToMaster { + case class StallingTasks(tasks: List[TaskId]) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala new file mode 100644 index 0000000..445f26c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + +object Constants { + + val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" + val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source" + val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink" + val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" + + val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" + + val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms" + + val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = + "gearpump.streaming.max-pending-message-count-per-connection" + + val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = + "gearpump.streaming.ack-once-every-message-count" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala new file mode 100644 index 0000000..4a94ad3 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming + +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.util.Graph + +/** + * DAG is wrapper for [[org.apache.gearpump.util.Graph]] for streaming applications. + */ +case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], + graph : Graph[ProcessorId, PartitionerDescription]) + extends Serializable { + + def isEmpty: Boolean = { + processors.isEmpty + } + + def taskCount: Int = { + processors.foldLeft(0) { (count, task) => + count + task._2.parallelism + } + } + + def tasks: List[TaskId] = { + processors.flatMap { pair => + val (processorId, processor) = pair + (0 until processor.parallelism).map(TaskId(processorId, _)) + }.toList + } +} + +object DAG { + def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = { + val processors = graph.vertices.map { processorDescription => + (processorDescription.id, processorDescription) + }.toMap + val dag = graph.mapVertex { processor => + processor.id + } + new DAG(version, processors, dag) + } + + def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription]) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala new file mode 100644 index 0000000..20e2529 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming + +import java.io.{DataInput, DataOutput} + +import org.apache.gearpump.streaming.task._ + +class TaskIdSerializer extends TaskMessageSerializer[TaskId] { + override def getLength(obj: TaskId): Int = 8 + + override def write(dataOutput: DataOutput, obj: TaskId): Unit = { + dataOutput.writeInt(obj.processorId) + dataOutput.writeInt(obj.index) + } + + override def read(dataInput: DataInput): TaskId = { + val processorId = dataInput.readInt() + val index = dataInput.readInt() + new TaskId(processorId, index) + } +} + +class AckSerializer extends TaskMessageSerializer[Ack] { + val taskIdSerializer = new TaskIdSerializer + + override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8 + + override def write(dataOutput: DataOutput, obj: Ack): Unit = { + taskIdSerializer.write(dataOutput, obj.taskId) + dataOutput.writeShort(obj.seq) + dataOutput.writeShort(obj.actualReceivedNum) + dataOutput.writeInt(obj.sessionId) + } + + override def read(dataInput: DataInput): Ack = { + val taskId = taskIdSerializer.read(dataInput) + val seq = dataInput.readShort() + val actualReceivedNum = dataInput.readShort() + val sessionId = dataInput.readInt() + Ack(taskId, seq, actualReceivedNum, sessionId) + } +} + +class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckRequest] { + val taskIdSerialzer = new TaskIdSerializer() + + override def getLength(obj: InitialAckRequest): Int = taskIdSerialzer.getLength(obj.taskId) + 4 + + override def write(dataOutput: DataOutput, obj: InitialAckRequest): Unit = { + taskIdSerialzer.write(dataOutput, obj.taskId) + dataOutput.writeInt(obj.sessionId) + } + + override def read(dataInput: DataInput): InitialAckRequest = { + val taskId = taskIdSerialzer.read(dataInput) + val sessionId = dataInput.readInt() + InitialAckRequest(taskId, sessionId) + } +} + +class AckRequestSerializer extends TaskMessageSerializer[AckRequest] { + val taskIdSerializer = new TaskIdSerializer + + override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6 + + override def write(dataOutput: DataOutput, obj: AckRequest): Unit = { + taskIdSerializer.write(dataOutput, obj.taskId) + dataOutput.writeShort(obj.seq) + dataOutput.writeInt(obj.sessionId) + } + + override def read(dataInput: DataInput): AckRequest = { + val taskId = taskIdSerializer.read(dataInput) + val seq = dataInput.readShort() + val sessionId = dataInput.readInt() + AckRequest(taskId, seq, sessionId) + } +} + +class LatencyProbeSerializer extends TaskMessageSerializer[LatencyProbe] { + override def getLength(obj: LatencyProbe): Int = 8 + + override def write(dataOutput: DataOutput, obj: LatencyProbe): Unit = { + dataOutput.writeLong(obj.timestamp) + } + + override def read(dataInput: DataInput): LatencyProbe = { + val timestamp = dataInput.readLong() + LatencyProbe(timestamp) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala new file mode 100644 index 0000000..66ec873 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +import akka.actor.ActorSystem + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster._ +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} +import org.apache.gearpump.streaming.appmaster.AppMaster +import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual} + +/** + * Processor is the blueprint for tasks. + */ +trait Processor[+T <: Task] extends ReferenceEqual { + + /** + * How many tasks you want to use for this processor. + */ + def parallelism: Int + + /** + * The custom [[org.apache.gearpump.cluster.UserConfig]], it is used to + * initialize a task in runtime. + */ + def taskConf: UserConfig + + /** + * Some description text for this processor. + */ + def description: String + + /** + * The task class, should be a subtype of Task. + * + * Each runtime instance of this class is a task. + */ + def taskClass: Class[_ <: Task] +} + +object Processor { + def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]) + : ProcessorDescription = { + import processor._ + ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf) + } + + def apply[T<: Task]( + parallelism : Int, description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]) + : DefaultProcessor[T] = { + new DefaultProcessor[T](parallelism, description, taskConf, + classtag.runtimeClass.asInstanceOf[Class[T]]) + } + + def apply[T<: Task]( + taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig) + : DefaultProcessor[T] = { + new DefaultProcessor[T](parallelism, description, taskConf, taskClazz) + } + + case class DefaultProcessor[T<: Task]( + parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) + extends Processor[T] { + + def withParallelism(parallel: Int): DefaultProcessor[T] = { + new DefaultProcessor[T](parallel, description, taskConf, taskClass) + } + + def withDescription(desc: String): DefaultProcessor[T] = { + new DefaultProcessor[T](parallelism, desc, taskConf, taskClass) + } + + def withConfig(conf: UserConfig): DefaultProcessor[T] = { + new DefaultProcessor[T](parallelism, description, conf, taskClass) + } + } +} + +/** + * Each processor has a LifeTime. + * + * When input message's timestamp is beyond current processor's lifetime, + * then it will not be processed by this processor. + */ +case class LifeTime(birth: TimeStamp, death: TimeStamp) { + def contains(timestamp: TimeStamp): Boolean = { + timestamp >= birth && timestamp < death + } + + def cross(another: LifeTime): LifeTime = { + LifeTime(Math.max(birth, another.birth), Math.min(death, another.death)) + } +} + +object LifeTime { + val Immortal = LifeTime(0L, Long.MaxValue) +} + +/** + * Represent a streaming application + */ +class StreamApplication( + override val name: String, val inputUserConfig: UserConfig, + val dag: Graph[ProcessorDescription, PartitionerDescription]) + extends Application { + + require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") + + override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster] + override def userConfig(implicit system: ActorSystem): UserConfig = { + inputUserConfig.withValue(StreamApplication.DAG, dag) + } +} + +case class ProcessorDescription( + id: ProcessorId, + taskClass: String, + parallelism : Int, + description: String = "", + taskConf: UserConfig = null, + life: LifeTime = LifeTime.Immortal, + jar: AppJar = null) extends ReferenceEqual + +object StreamApplication { + + private val hashPartitioner = new HashPartitioner() + private val LOG = LogUtil.getLogger(getClass) + + def apply[T <: Processor[Task], P <: Partitioner]( + name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = { + import org.apache.gearpump.streaming.Processor._ + + if (dag.hasCycle()) { + LOG.warn(s"Detected cycles in DAG of application $name!") + } + + val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap + val graph = dag.mapVertex { processor => + val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) + updatedProcessor + }.mapEdge { (node1, edge, node2) => + PartitionerDescription(new PartitionerObject( + Option(edge).getOrElse(StreamApplication.hashPartitioner))) + } + new StreamApplication(name, userConfig, graph) + } + + val DAG = "DAG" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala new file mode 100644 index 0000000..7c08b9b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import java.lang.management.ManagementFactory + +import akka.actor._ +import org.apache.gearpump._ +import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure} +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.metrics.Metrics.ReportMetrics +import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.AppMaster._ +import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} +import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo} +import org.apache.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, GetTaskList, TaskList} +import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} +import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster +import org.apache.gearpump.streaming.task._ +import org.apache.gearpump.streaming.util.ActorPathUtil +import org.apache.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _} +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +import org.apache.gearpump.util._ +import org.slf4j.Logger + +import scala.concurrent.Future + +/** + * AppMaster is the head of a streaming application. + * + * It contains: + * 1. ExecutorManager to manage all executors. + * 2. TaskManager to manage all tasks, + * 3. ClockService to track the global clock for this streaming application. + * 4. Scheduler to decide which a task should be scheduled to. + */ +class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster { + import app.userConfig + import appContext.{appId, masterProxy, username} + + private implicit val actorSystem = context.system + private implicit val timeOut = FUTURE_TIMEOUT + + import akka.pattern.ask + private implicit val dispatcher = context.dispatcher + + private val startTime: TimeStamp = System.currentTimeMillis() + + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx") + LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, self.path)}") + + private val address = ActorUtil.getFullPath(context.system, self.path) + + private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy) + private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store, + Some(getUpdatedDAG())))) + + private var taskManager: Option[ActorRef] = None + private var clockService: Option[ActorRef] = None + private val systemConfig = context.system.settings.config + private var lastFailure = LastFailure(0L, null) + + private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, + self.path.toString, + Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active") + + private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) + + private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + + private val userDir = System.getProperty("user.dir") + private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config) + + private val appMasterExecutorSummary = ExecutorSummary( + APPMASTER_DEFAULT_EXECUTOR_ID, + Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), + self.path.toString, + logFile.getAbsolutePath, + status = "Active", + taskCount = 0, + tasks = Map.empty[ProcessorId, List[TaskId]], + jvmName = ManagementFactory.getRuntimeMXBean().getName() + ) + + private val historyMetricsService = if (metricsEnabled) { + // Registers jvm metrics + Metrics(context.system).register(new JvmMetricsSet( + s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}")) + + val historyMetricsService = context.actorOf(Props(new HistoryMetricsService( + s"app$appId", getHistoryMetricsConfig))) + + val metricsReportService = context.actorOf(Props( + new MetricsReporterService(Metrics(context.system)))) + historyMetricsService.tell(ReportMetrics, metricsReportService) + + Some(historyMetricsService) + } else { + None + } + + private val executorManager: ActorRef = + context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name), + ActorPathUtil.executorManagerActorName) + + for (dag <- getDAG) { + clockService = Some(context.actorOf(Props(new ClockService(dag, store)))) + val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context) + + taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager, + jarScheduler, executorManager, clockService.get, self, app.name)))) + } + + override def receive: Receive = { + taskMessageHandler orElse + executorMessageHandler orElse + recover orElse + appMasterService orElse + ActorUtil.defaultMsgHandler(self) + } + + /** Handles messages from Tasks */ + def taskMessageHandler: Receive = { + case clock: ClockEvent => + taskManager.foreach(_ forward clock) + case register: RegisterTask => + taskManager.foreach(_ forward register) + case unRegister: UnRegisterTask => + taskManager.foreach(_ forward unRegister) + // Checks whether this processor dead, if it is, then we should remove it from clockService. + clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId)) + case replay: ReplayFromTimestampWindowTrailingEdge => + taskManager.foreach(_ forward replay) + case messageLoss: MessageLoss => + lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause) + taskManager.foreach(_ forward messageLoss) + case lookupTask: LookupTaskActorRef => + taskManager.foreach(_ forward lookupTask) + case checkpoint: ReportCheckpointClock => + clockService.foreach(_ forward checkpoint) + case GetDAG => + val task = sender + getDAG.foreach { + dag => task ! dag + } + case GetCheckpointClock => + clockService.foreach(_ forward GetCheckpointClock) + } + + /** Handles messages from Executors */ + def executorMessageHandler: Receive = { + case register: RegisterExecutor => + executorManager forward register + case ReportMetrics => + historyMetricsService.foreach(_ forward ReportMetrics) + } + + /** Handles messages from AppMaster */ + def appMasterService: Receive = { + case appMasterDataDetailRequest: AppMasterDataDetailRequest => + LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ") + + val executorsFuture = executorBrief + val clockFuture = getMinClock + val taskFuture = getTaskList + val dagFuture = getDAG + + val appMasterDataDetail = for { + executors <- executorsFuture + clock <- clockFuture + tasks <- taskFuture + dag <- dagFuture + } yield { + val graph = dag.graph + + val executorToTasks = tasks.tasks.groupBy(_._2).mapValues { + _.keys.toList + } + + val processors = dag.processors.map { kv => + val processor = kv._2 + import processor._ + val tasks = executorToTasks.map { kv => + (kv._1, TaskCount(kv._2.count(_.processorId == id))) + }.filter(_._2.count != 0) + (id, + ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, + tasks.keys.toList, tasks)) + } + + StreamAppMasterSummary( + appId = appId, + appName = app.name, + actorPath = address, + clock = clock, + status = MasterToAppMaster.AppMasterActive, + startTime = startTime, + uptime = System.currentTimeMillis() - startTime, + user = username, + homeDirectory = userDir, + logFile = logFile.getAbsolutePath, + processors = processors, + processorLevels = graph.vertexHierarchyLevelMap(), + dag = graph.mapEdge { (node1, edge, node2) => + edge.partitionerFactory.name + }, + executors = executors, + historyMetricsConfig = getHistoryMetricsConfig + ) + } + + val client = sender() + + appMasterDataDetail.map { appData => + client ! appData + } + // TODO: WebSocket is buggy and disabled. + // case appMasterMetricsRequest: AppMasterMetricsRequest => + // val client = sender() + // actorSystem.eventStream.subscribe(client, classOf[MetricType]) + case query: QueryHistoryMetrics => + if (historyMetricsService.isEmpty) { + // Returns empty metrics so that we don't hang the UI + sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) + } else { + historyMetricsService.get forward query + } + case getStalling: GetStallingTasks => + clockService.foreach(_ forward getStalling) + case replaceDAG: ReplaceProcessor => + dagManager forward replaceDAG + case GetLastFailure(_) => + sender ! lastFailure + case get@GetExecutorSummary(executorId) => + val client = sender + if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) { + client ! appMasterExecutorSummary + } else { + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { map => + map.get(executorId).foreach { executor => + executor.executor.tell(get, client) + } + } + } + case query@QueryExecutorConfig(executorId) => + val client = sender + if (executorId == -1) { + val systemConfig = context.system.settings.config + sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + } else { + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { map => + map.get(executorId).foreach { executor => + executor.executor.tell(query, client) + } + } + } + } + + /** Error handling */ + def recover: Receive = { + case FailedToRecover(errorMsg) => + if (context.children.toList.contains(sender())) { + LOG.error(errorMsg) + masterProxy ! ShutdownApplication(appId) + } + case AllocateResourceTimeOut => + LOG.error(s"Failed to allocate resource in time, shutdown application $appId") + masterProxy ! ShutdownApplication(appId) + context.stop(self) + } + + private def getMinClock: Future[TimeStamp] = { + clockService match { + case Some(clockService) => + (clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) + case None => + Future.failed(new ServiceNotAvailableException("clock service not ready")) + } + } + + private def executorBrief: Future[List[ExecutorBrief]] = { + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { infos => + infos.values.map { info => + ExecutorBrief(info.executorId, + info.executor.path.toSerializationFormat, + info.worker.workerId, + "active") + }.toList :+ appMasterBrief + } + } + + private def getTaskList: Future[TaskList] = { + taskManager match { + case Some(taskManager) => + (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]] + case None => + Future.failed(new ServiceNotAvailableException("task manager not ready")) + } + } + + private def getDAG: Future[DAG] = { + (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag) + } + + private def getUpdatedDAG(): DAG = { + val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, + PartitionerDescription]](StreamApplication.DAG).get) + val updated = dag.processors.map { idAndProcessor => + val (id, oldProcessor) = idAndProcessor + val newProcessor = if (oldProcessor.jar == null) { + oldProcessor.copy(jar = appContext.appJar.getOrElse(null)) + } else { + oldProcessor + } + (id, newProcessor) + } + DAG(dag.version, updated, dag.graph) + } +} + +object AppMaster { + + /** Master node doesn't return resource in time */ + case object AllocateResourceTimeOut + + /** Query task ActorRef by providing the taskId */ + case class LookupTaskActorRef(taskId: TaskId) + + case class TaskActorRef(task: ActorRef) + + class ServiceNotAvailableException(reason: String) extends Exception(reason) + + case class ExecutorBrief( + executorId: ExecutorId, executor: String, workerId: WorkerId, status: String) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala new file mode 100644 index 0000000..458fded --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import java.util +import java.util.Date +import java.util.concurrent.TimeUnit + +import akka.actor.{Actor, Cancellable, Stash} +import io.gearpump.google.common.primitives.Longs +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks +import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue +import org.apache.gearpump.streaming.appmaster.ClockService._ +import org.apache.gearpump.streaming.storage.AppDataStore +import org.apache.gearpump.streaming.task._ +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.language.implicitConversions + +/** + * Maintains a global view of message timestamp in the application + */ +class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash { + private val LOG: Logger = LogUtil.getLogger(getClass) + + import context.dispatcher + + private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60) + private var healthCheckScheduler: Cancellable = null + private var snapshotScheduler: Cancellable = null + + override def receive: Receive = null + + override def preStart(): Unit = { + LOG.info("Initializing Clock service, get snapshotted StartClock ....") + store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock => + val startClock = Option(clock).getOrElse(0L) + + minCheckpointClock = Some(startClock) + + // Recover the application by restarting from last persisted startClock. + // Only messge after startClock will be replayed. + self ! StoredStartClock(startClock) + LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock") + } + + context.become(waitForStartClock) + } + + override def postStop(): Unit = { + Option(healthCheckScheduler).map(_.cancel) + Option(snapshotScheduler).map(_.cancel) + } + + // Keep track of clock value of all processors. + private var clocks = Map.empty[ProcessorId, ProcessorClock] + + // Each process can have multiple upstream processors. This keep track of the upstream clocks. + private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]] + + // We use Array instead of List for Performance consideration + private var processorClocks = Array.empty[ProcessorClock] + + private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null + + private var minCheckpointClock: Option[TimeStamp] = None + + private def checkpointEnabled(processor: ProcessorDescription): Boolean = { + val taskConf = processor.taskConf + taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == Some(true) + } + + private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = { + this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death) + .filter { case (_, processor) => + checkpointEnabled(processor) + }.flatMap { case (id, processor) => + (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp]) + } + if (this.checkpointClocks.isEmpty) { + minCheckpointClock = None + } + } + + private def initDag(startClock: TimeStamp): Unit = { + recoverDag(this.dag, startClock) + } + + private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = { + this.clocks = dag.processors.filter(startClock < _._2.life.death). + map { pair => + val (processorId, processor) = pair + val parallelism = processor.parallelism + val clock = new ProcessorClock(processorId, processor.life, parallelism) + clock.init(startClock) + (processorId, clock) + } + + this.upstreamClocks = clocks.map { pair => + val (processorId, processor) = pair + + val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) + val upstreamClocks = upstreams.flatMap(clocks.get(_)) + (processorId, upstreamClocks.toArray) + } + + this.processorClocks = clocks.toArray.map(_._2) + + resetCheckpointClocks(dag, startClock) + } + + private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = { + val newClocks = dag.processors.filter(startClock < _._2.life.death). + map { pair => + val (processorId, processor) = pair + val parallelism = processor.parallelism + + val clock = if (clocks.contains(processor.id)) { + clocks(processorId).copy(life = processor.life) + } else { + new ProcessorClock(processorId, processor.life, parallelism) + } + (processorId, clock) + } + + this.clocks = newClocks + + this.upstreamClocks = newClocks.map { pair => + val (processorId, processor) = pair + + val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) + val upstreamClocks = upstreams.flatMap(newClocks.get(_)) + (processorId, upstreamClocks.toArray) + } + + // Inits the clock of all processors. + newClocks.map { pair => + val (processorId, processorClock) = pair + val upstreamClock = getUpStreamMinClock(processorId) + val birth = processorClock.life.birth + + if (dag.graph.inDegreeOf(processorId) == 0) { + processorClock.init(Longs.max(birth, startClock)) + } else { + processorClock.init(upstreamClock) + } + } + + this.processorClocks = clocks.toArray.map(_._2) + + resetCheckpointClocks(dag, startClock) + } + + def waitForStartClock: Receive = { + case StoredStartClock(startClock) => + initDag(startClock) + + import context.dispatcher + + // Period report current clock + healthCheckScheduler = context.system.scheduler.schedule( + new FiniteDuration(5, TimeUnit.SECONDS), + new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck) + + // Period snpashot latest min startclock to external storage + snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS), + new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock) + + unstashAll() + context.become(clockService) + + case _ => + stash() + } + + private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = { + val clocks = upstreamClocks.get(processorId) + if (clocks.isDefined) { + if (clocks.get == null || clocks.get.length == 0) { + Long.MaxValue + } else { + ProcessorClocks.minClock(clocks.get) + } + } else { + Long.MaxValue + } + } + + def clockService: Receive = { + case GetUpstreamMinClock(task) => + sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId)) + + case update@UpdateClock(task, clock) => + val upstreamMinClock = getUpStreamMinClock(task.processorId) + + val processorClock = clocks.get(task.processorId) + if (processorClock.isDefined) { + processorClock.get.updateMinClock(task.index, clock) + } else { + LOG.error(s"Cannot updateClock for task $task") + } + sender ! UpstreamMinClock(upstreamMinClock) + + case GetLatestMinClock => + sender ! LatestMinClock(minClock) + + case GetStartClock => + sender ! StartClock(getStartClock) + + case deathCheck: CheckProcessorDeath => + val processorId = deathCheck.processorId + val processorClock = clocks.get(processorId) + if (processorClock.isDefined) { + val life = processorClock.get.life + if (processorClock.get.min >= life.death) { + + LOG.info(s"Removing $processorId from clock service...") + removeProcessor(processorId) + } else { + LOG.info(s"Unsuccessfully in removing $processorId from clock service...," + + s" min: ${processorClock.get.min}, life: $life") + } + } + case HealthCheck => + selfCheck() + + case SnapshotStartClock => + snapshotStartClock() + + case ReportCheckpointClock(task, time) => + updateCheckpointClocks(task, time) + + case GetCheckpointClock => + sender ! CheckpointClock(minCheckpointClock) + + case getStalling: GetStallingTasks => + sender ! StallingTasks(healthChecker.getReport.stallingTasks) + + case ChangeToNewDAG(dag) => + if (dag.version > this.dag.version) { + // Transits to a new dag version + this.dag = dag + dynamicDAG(dag, getStartClock) + } else { + // Restarts current dag. + recoverDag(dag, getStartClock) + } + LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess") + sender ! ChangeToNewDAGSuccess(clocks.map { pair => + val (id, clock) = pair + (id, clock.min) + }) + } + + private def removeProcessor(processorId: ProcessorId): Unit = { + clocks = clocks - processorId + processorClocks = processorClocks.filter(_.processorId != processorId) + + upstreamClocks = upstreamClocks.map { pair => + val (id, upstreams) = pair + val updatedUpstream = upstreams.filter(_.processorId != processorId) + (id, updatedUpstream) + } + + upstreamClocks = upstreamClocks - processorId + + // Removes dead processor from checkpoints. + checkpointClocks = checkpointClocks.filter { kv => + val (taskId, processor) = kv + taskId.processorId != processorId + } + } + + private def minClock: TimeStamp = { + ProcessorClocks.minClock(processorClocks) + } + + def selfCheck(): Unit = { + val minTimestamp = minClock + + if (Long.MaxValue == minTimestamp) { + processorClocks.foreach { clock => + LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " + + s"taskClocks: " + clock.taskClocks.mkString(",")) + } + } + + healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis()) + } + + private def getStartClock: TimeStamp = { + minCheckpointClock.getOrElse(minClock) + } + + private def snapshotStartClock(): Unit = { + store.put(START_CLOCK, getStartClock) + } + + private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = { + val clocks = checkpointClocks(task) :+ time + checkpointClocks += task -> clocks + + if (checkpointClocks.forall(_._2.contains(time))) { + minCheckpointClock = Some(time) + LOG.info(s"minCheckpointTime $minCheckpointClock") + + checkpointClocks = checkpointClocks.mapValues(_.dropWhile(_ <= time)) + } + } +} + +object ClockService { + val START_CLOCK = "startClock" + + case object HealthCheck + + class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int, + private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) { + + def copy(life: LifeTime): ProcessorClock = { + new ProcessorClock(processorId, life, parallism, _min, _taskClocks) + } + + def min: TimeStamp = _min + def taskClocks: Array[TimeStamp] = _taskClocks + + def init(startClock: TimeStamp): Unit = { + if (taskClocks == null) { + this._min = startClock + this._taskClocks = new Array(parallism) + util.Arrays.fill(taskClocks, startClock) + } + } + + def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = { + taskClocks(taskIndex) = clock + _min = Longs.min(taskClocks: _*) + } + } + + case object SnapshotStartClock + + case class Report(stallingTasks: List[TaskId]) + + /** + * Check whether the clock is advancing normally + */ + class HealthChecker(stallingThresholdSeconds: Int) { + private val LOG: Logger = LogUtil.getLogger(getClass) + + private var minClock: ClockValue = null + private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 + // 60 seconds + private var stallingTasks = Array.empty[TaskId] + + /** Check for stalling tasks */ + def check( + currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], + dag: DAG, now: TimeStamp): Unit = { + var isClockStalling = false + if (null == minClock || currentMinClock > minClock.appClock) { + minClock = ClockValue(systemClock = now, appClock = currentMinClock) + } else { + // Clock not advancing + if (now > minClock.systemClock + stallingThresholdMilliseconds) { + LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " + + s"since ${minClock.prettyPrint}...") + isClockStalling = true + } + } + + if (isClockStalling) { + val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId => + val clock = processorClocks.get(processorId) + if (clock.isDefined) { + clock.get.min == minClock.appClock + } else { + false + } + } + + processorId.foreach { processorId => + val processorClock = processorClocks(processorId) + val taskClocks = processorClock.taskClocks + stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock). + map(pair => TaskId(processorId, pair._2)) + } + LOG.info(s"Stalling Tasks: ${stallingTasks.mkString(",")}") + } else { + stallingTasks = Array.empty[TaskId] + } + } + + def getReport: Report = { + Report(stallingTasks.toList) + } + } + + object HealthChecker { + case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) { + def prettyPrint: String = { + "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")" + } + } + } + + object ProcessorClocks { + + // Get the Min clock of all processors + def minClock(clock: Array[ProcessorClock]): TimeStamp = { + var i = 0 + var min = if (clock.length == 0) 0L else clock(0).min + while (i < clock.length) { + min = Math.min(min, clock(i).min) + i += 1 + } + min + } + } + + case class ChangeToNewDAG(dag: DAG) + case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp]) + + case class StoredStartClock(clock: TimeStamp) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala new file mode 100644 index 0000000..3341d4f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{Actor, ActorRef, Stash} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.DagManager._ +import org.apache.gearpump.streaming.storage.AppDataStore +import org.apache.gearpump.streaming.task.Subscriber +import org.apache.gearpump.util.{Graph, LogUtil} +import org.slf4j.Logger + +import scala.concurrent.Future + +/** + * Handles dag modification and other stuff related with DAG + * + * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable. + * For operations like modifying a processor, it will create a new version of DAG. + */ +class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG]) + extends Actor with Stash { + + import context.dispatcher + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + private val NOT_INITIALIZED = -1 + + private var dags = List.empty[DAG] + private var maxProcessorId = -1 + private implicit val system = context.system + + private var watchers = List.empty[ActorRef] + + override def receive: Receive = null + + override def preStart(): Unit = { + LOG.info("Initializing Dag Service, get stored Dag ....") + store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag => + if (storedDag != null) { + dags :+= storedDag + } else { + dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription, + PartitionerDescription]](StreamApplication.DAG).get)) + } + maxProcessorId = { + val keys = dags.head.processors.keys + if (keys.size == 0) { + 0 + } else { + keys.max + } + } + self ! DagInitiated + } + context.become(waitForDagInitiate) + } + + def waitForDagInitiate: Receive = { + case DagInitiated => + unstashAll() + context.become(dagService) + case _ => + stash() + } + + private def nextProcessorId: ProcessorId = { + maxProcessorId += 1 + maxProcessorId + } + + private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): TaskLaunchData = { + val processorDescription = dag.processors(processorId) + val subscribers = Subscriber.of(processorId, dag) + TaskLaunchData(processorDescription, subscribers, context) + } + + def dagService: Receive = { + case GetLatestDAG => + // Get the latest version of DAG. + sender ! LatestDAG(dags.last) + case GetTaskLaunchData(version, processorId, context) => + // Task information like Processor class, downstream subscriber processors and etc. + dags.find(_.version == version).foreach { dag => + LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version") + sender ! taskLaunchData(dag, processorId, context) + } + case ReplaceProcessor(oldProcessorId, inputNewProcessor) => + // Replace a processor with new implementation. The upstream processors and downstream + // processors are NOT changed. + var newProcessor = inputNewProcessor.copy(id = nextProcessorId) + if (inputNewProcessor.jar == null) { + val oldJar = dags.last.processors.get(oldProcessorId).get + newProcessor = newProcessor.copy(jar = oldJar.jar) + } + if (dags.length > 1) { + sender ! DAGOperationFailed( + "We are in the process of handling previous dynamic dag change") + } else { + val oldDAG = dags.last + val newVersion = oldDAG.version + 1 + val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, newVersion) + dags :+= newDAG + + LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor") + LOG.info(s"new DAG: $newDAG") + watchers.foreach(_ ! LatestDAG(newDAG)) + sender ! DAGOperationSuccess + } + + case WatchChange(watcher) => + // Checks whether there are modifications for this DAG. + if (!this.watchers.contains(watcher)) { + this.watchers :+= watcher + } + + case NewDAGDeployed(dagVersion) => + // Means dynamic Dag transition completed, and the new DAG version has been successfully + // deployed. The obsolete dag versions will be removed. + if (dagVersion != NOT_INITIALIZED) { + dags = dags.filter(_.version == dagVersion) + store.put(StreamApplication.DAG, dags.last) + } + } + + private def replaceDAG( + dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) + : DAG = { + val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, + newProcessor.life.birth) + + val newProcessorMap = dag.processors ++ + Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), + newProcessor.id -> newProcessor) + + val newGraph = dag.graph.subGraph(oldProcessorId). + replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) + new DAG(newVersion, newProcessorMap, newGraph) + } +} + +object DagManager { + case object DagInitiated + + case class WatchChange(watcher: ActorRef) + + case object GetLatestDAG + case class LatestDAG(dag: DAG) + + case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null) + case class TaskLaunchData(processorDescription : ProcessorDescription, + subscribers: List[Subscriber], context: AnyRef = null) + + sealed trait DAGOperation + + case class ReplaceProcessor(oldProcessorId: ProcessorId, + newProcessorDescription: ProcessorDescription) extends DAGOperation + + sealed trait DAGOperationResult + case object DAGOperationSuccess extends DAGOperationResult + case class DAGOperationFailed(reason: String) extends DAGOperationResult + + case class NewDAGDeployed(dagVersion: Int) +} \ No newline at end of file
