http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala deleted file mode 100644 index 9f38cd7..0000000 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.pagerank - -import akka.actor.Actor.Receive - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId -import io.gearpump.experiments.pagerank.PageRankController.Tick -import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight} -import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper} -import io.gearpump.util.Graph - -class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.taskId - - private var weight: Double = 1.0 - private var upstreamWeights = Map.empty[TaskId, Double] - - val taskCount = conf.getInt(PageRankApplication.COUNT).get - lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _)) - - private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get - - private val node = graph.vertices.find { node => - node.taskId == taskContext.taskId.index - }.get - - private val downstream = graph.outgoingEdgesOf(node).map(_._3) - .map(id => taskId.copy(index = id.taskId)).toSeq - private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length - - LOG.info(s"downstream nodes: $downstream") - - private var tick = 0 - - private def output(msg: AnyRef, tasks: TaskId*): Unit = { - taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) - } - - override def receiveUnManagedMessage: Receive = { - case Tick(tick) => - this.tick = tick - - if (downstream.length == 0) { - // If there is no downstream, we will evenly distribute our page rank to - // every node in the graph - val update = UpdateWeight(taskId, weight / taskCount) - output(update, allTasks: _*) - } else { - val update = UpdateWeight(taskId, weight / downstream.length) - output(update, downstream: _*) - } - case update@UpdateWeight(upstreamTaskId, weight) => - upstreamWeights += upstreamTaskId -> weight - if (upstreamWeights.size == upstreamCount) { - val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 } - this.upstreamWeights = Map.empty[TaskId, Double] - this.weight = nextWeight - output(LatestWeight(taskId, weight, tick), TaskId(0, 0)) - } - } -} - -object PageRankWorker { - case class UpdateWeight(taskId: TaskId, weight: Double) - case class LatestWeight(taskId: TaskId, weight: Double, tick: Int) -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala deleted file mode 100644 index 3877974..0000000 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.pagerank.example - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.experiments.pagerank.PageRankApplication -import io.gearpump.util.Graph.Node -import io.gearpump.util.{AkkaApp, Graph} - -/** A very simple PageRank example, Cyclic graph is not supported */ -object PageRankExample extends AkkaApp { - - val a = "a" - val b = "b" - val c = "c" - val d = "d" - - def help(): Unit = Unit - - def main(akkaConf: Config, args: Array[String]): Unit = { - val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b) - val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph) - val context = ClientContext(akkaConf) - val appId = context.submit(app) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala new file mode 100644 index 0000000..023ee35 --- /dev/null +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.pagerank + +import akka.actor.ActorSystem + +import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig} +import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.appmaster.AppMaster +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph.Node + +/** + * + * A simple and naive pagerank implementation. + * + * @param name name of the application + * @param iteration max iteration count + * @param delta decide the accuracy when the page rank example stops. + * @param dag the page rank graph + */ +class PageRankApplication[T]( + override val name: String, iteration: Int, delta: Double, dag: Graph[T, _]) + extends Application { + + override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster] + override def userConfig(implicit system: ActorSystem): UserConfig = { + + // Map node with taskId + var taskId = 0 + val pageRankDag = dag.mapVertex { node => + val updatedNode = NodeWithTaskId(taskId, node) + taskId += 1 + updatedNode + } + + val taskCount = taskId + + val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag). + withInt(PageRankApplication.ITERATION, iteration). + withInt(PageRankApplication.COUNT, taskCount). + withDouble(PageRankApplication.DELTA, delta) + + val controller = Processor[PageRankController](1) + val pageRankWorker = Processor[PageRankWorker](taskCount) + val partitioner = new HashPartitioner + + val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig) + app.userConfig + } +} + +object PageRankApplication { + val DAG = "PageRank.DAG" + val ITERATION = "PageRank.Iteration" + val COUNT = "PageRank.COUNT" + val DELTA = "PageRank.DELTA" + val REPORTER = "PageRank.Reporter" + case class NodeWithTaskId[T](taskId: Int, node: T) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala new file mode 100644 index 0000000..d461876 --- /dev/null +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.pagerank + +import akka.actor.Actor.Receive + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.pagerank.PageRankController.Tick +import org.apache.gearpump.experiments.pagerank.PageRankWorker.LatestWeight +import org.apache.gearpump.streaming.task._ + +class PageRankController(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + val taskCount = conf.getInt(PageRankApplication.COUNT).get + val iterationMax = conf.getInt(PageRankApplication.ITERATION).get + val delta = conf.getDouble(PageRankApplication.DELTA).get + + val tasks = (0 until taskCount).toList.map(TaskId(1, _)) + + var tick: Int = 0 + var receivedWeightForCurrentTick = 0 + + var weights = Map.empty[TaskId, Double] + var deltas = Map.empty[TaskId, Double] + + override def onStart(startTime: StartTime): Unit = { + output(Tick(tick), tasks: _*) + } + + private def output(msg: AnyRef, tasks: TaskId*): Unit = { + taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) + } + + override def receiveUnManagedMessage: Receive = { + case LatestWeight(taskId, weight, replyTick) => + if (this.tick == replyTick) { + + deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0)) + weights += taskId -> weight + receivedWeightForCurrentTick += 1 + if (receivedWeightForCurrentTick == taskCount) { + this.tick += 1 + receivedWeightForCurrentTick = 0 + if (continueIteration) { + LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas") + output(Tick(tick), tasks: _*) + } else { + LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas") + } + } + } + } + + private def continueIteration: Boolean = { + (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) => + deltaExceed || value > delta + } + } +} + +object PageRankController { + case class Tick(iteration: Int) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala new file mode 100644 index 0000000..e033bf1 --- /dev/null +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.pagerank + +import akka.actor.Actor.Receive + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId +import org.apache.gearpump.experiments.pagerank.PageRankController.Tick +import org.apache.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper} +import org.apache.gearpump.util.Graph + +class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.taskId + + private var weight: Double = 1.0 + private var upstreamWeights = Map.empty[TaskId, Double] + + val taskCount = conf.getInt(PageRankApplication.COUNT).get + lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _)) + + private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get + + private val node = graph.vertices.find { node => + node.taskId == taskContext.taskId.index + }.get + + private val downstream = graph.outgoingEdgesOf(node).map(_._3) + .map(id => taskId.copy(index = id.taskId)).toSeq + private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length + + LOG.info(s"downstream nodes: $downstream") + + private var tick = 0 + + private def output(msg: AnyRef, tasks: TaskId*): Unit = { + taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) + } + + override def receiveUnManagedMessage: Receive = { + case Tick(tick) => + this.tick = tick + + if (downstream.length == 0) { + // If there is no downstream, we will evenly distribute our page rank to + // every node in the graph + val update = UpdateWeight(taskId, weight / taskCount) + output(update, allTasks: _*) + } else { + val update = UpdateWeight(taskId, weight / downstream.length) + output(update, downstream: _*) + } + case update@UpdateWeight(upstreamTaskId, weight) => + upstreamWeights += upstreamTaskId -> weight + if (upstreamWeights.size == upstreamCount) { + val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 } + this.upstreamWeights = Map.empty[TaskId, Double] + this.weight = nextWeight + output(LatestWeight(taskId, weight, tick), TaskId(0, 0)) + } + } +} + +object PageRankWorker { + case class UpdateWeight(taskId: TaskId, weight: Double) + case class LatestWeight(taskId: TaskId, weight: Double, tick: Int) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala new file mode 100644 index 0000000..9dc311f --- /dev/null +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.pagerank.example + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.experiments.pagerank.PageRankApplication +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph} + +/** A very simple PageRank example, Cyclic graph is not supported */ +object PageRankExample extends AkkaApp { + + val a = "a" + val b = "b" + val c = "c" + val d = "d" + + def help(): Unit = Unit + + def main(akkaConf: Config, args: Array[String]): Unit = { + val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b) + val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph) + val context = ClientContext(akkaConf) + val appId = context.submit(app) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala deleted file mode 100644 index b517126..0000000 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.complexdag - -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph.{Node => GraphNode} -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) -case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) -case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) - -/** - * digraph flow { - * Source_0 -> Sink_0; - * Source_0 -> Sink_1; - * Source_0 -> Sink_2; - * Source_0 -> Node_1; - * Source_1 -> Node_0; - * Node_0 -> Sink_3; - * Node_1 -> Sink_3; - * Node_1 -> Sink_4; - * Node_1 -> Node_4; - * Node_2 -> Node_3; - * Node_1 -> Node_3; - * Source_0 -> Node_2; - * Source_0 -> Node_3; - * Node_3 -> Sink_3; - * Node_4 -> Sink_3; - * Source_1 -> Sink_4; - * } - */ -object Dag extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - val RUN_FOR_EVER = -1 - - override val options: Array[(String, CLIOption[Any])] = Array.empty - - def application(config: ParseResult): StreamApplication = { - - val source_0 = Processor[Source_0](1) - val source_1 = Processor[Source_1](1) - val node_0 = Processor[Node_0](1) - val node_1 = Processor[Node_1](1) - val node_2 = Processor[Node_2](1) - val node_3 = Processor[Node_3](1) - val node_4 = Processor[Node_4](1) - val sink_0 = Processor[Sink_0](1) - val sink_1 = Processor[Sink_1](1) - val sink_2 = Processor[Sink_2](1) - val sink_3 = Processor[Sink_3](1) - val sink_4 = Processor[Sink_4](1) - val partitioner = new HashPartitioner - val app = StreamApplication("dag", Graph( - source_0 ~ partitioner ~> sink_1, - source_0 ~ partitioner ~> sink_2, - source_0 ~ partitioner ~> node_2, - source_0 ~ partitioner ~> node_3, - source_0 ~ partitioner ~> node_1, - source_0 ~ partitioner ~> sink_0, - node_2 ~ partitioner ~> node_3, - node_1 ~ partitioner ~> node_3, - node_1 ~ partitioner ~> sink_3, - node_1 ~ partitioner ~> node_4, - source_1 ~ partitioner ~> sink_4, - source_1 ~ partitioner ~> node_0, - node_3 ~ partitioner ~> sink_3, - node_4 ~ partitioner ~> sink_3, - node_0 ~ partitioner ~> sink_3 - ), UserConfig.empty) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val userConf = parse(args) - val context = ClientContext(akkaConf) - val appId = context.submit(application(userConf)) - context.close() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala deleted file mode 100644 index dfad6c8..0000000 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.complexdag - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output - - override def onStart(startTime: StartTime): Unit = {} - - override def onNext(msg: Message): Unit = { - val list = msg.msg.asInstanceOf[Vector[String]] - output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis())) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala deleted file mode 100644 index b091a17..0000000 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.complexdag - -import scala.collection.mutable - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - var list = mutable.MutableList[String]() - - override def onStart(startTime: StartTime): Unit = { - list += getClass.getCanonicalName - } - - override def onNext(msg: Message): Unit = { - val l = msg.msg.asInstanceOf[Vector[String]] - list.size match { - case 1 => - l.foreach(f => { - list += f - }) - case _ => - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala deleted file mode 100644 index df656ac..0000000 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.complexdag - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output - - override def onStart(startTime: StartTime): Unit = { - self ! Message("start") - } - - override def onNext(msg: Message): Unit = { - val list = Vector(getClass.getCanonicalName) - output(new Message(list, System.currentTimeMillis)) - self ! Message("continue", System.currentTimeMillis()) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala new file mode 100644 index 0000000..3b6ceb8 --- /dev/null +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.complexdag + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph.{Node => GraphNode} +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) +case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) +case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) + +/** + * digraph flow { + * Source_0 -> Sink_0; + * Source_0 -> Sink_1; + * Source_0 -> Sink_2; + * Source_0 -> Node_1; + * Source_1 -> Node_0; + * Node_0 -> Sink_3; + * Node_1 -> Sink_3; + * Node_1 -> Sink_4; + * Node_1 -> Node_4; + * Node_2 -> Node_3; + * Node_1 -> Node_3; + * Source_0 -> Node_2; + * Source_0 -> Node_3; + * Node_3 -> Sink_3; + * Node_4 -> Sink_3; + * Source_1 -> Sink_4; + * } + */ +object Dag extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + val RUN_FOR_EVER = -1 + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + def application(config: ParseResult): StreamApplication = { + + val source_0 = Processor[Source_0](1) + val source_1 = Processor[Source_1](1) + val node_0 = Processor[Node_0](1) + val node_1 = Processor[Node_1](1) + val node_2 = Processor[Node_2](1) + val node_3 = Processor[Node_3](1) + val node_4 = Processor[Node_4](1) + val sink_0 = Processor[Sink_0](1) + val sink_1 = Processor[Sink_1](1) + val sink_2 = Processor[Sink_2](1) + val sink_3 = Processor[Sink_3](1) + val sink_4 = Processor[Sink_4](1) + val partitioner = new HashPartitioner + val app = StreamApplication("dag", Graph( + source_0 ~ partitioner ~> sink_1, + source_0 ~ partitioner ~> sink_2, + source_0 ~ partitioner ~> node_2, + source_0 ~ partitioner ~> node_3, + source_0 ~ partitioner ~> node_1, + source_0 ~ partitioner ~> sink_0, + node_2 ~ partitioner ~> node_3, + node_1 ~ partitioner ~> node_3, + node_1 ~ partitioner ~> sink_3, + node_1 ~ partitioner ~> node_4, + source_1 ~ partitioner ~> sink_4, + source_1 ~ partitioner ~> node_0, + node_3 ~ partitioner ~> sink_3, + node_4 ~ partitioner ~> sink_3, + node_0 ~ partitioner ~> sink_3 + ), UserConfig.empty) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val userConf = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(userConf)) + context.close() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala new file mode 100644 index 0000000..8d163f9 --- /dev/null +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.complexdag + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output + + override def onStart(startTime: StartTime): Unit = {} + + override def onNext(msg: Message): Unit = { + val list = msg.msg.asInstanceOf[Vector[String]] + output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis())) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala new file mode 100644 index 0000000..8dfa565 --- /dev/null +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.complexdag + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +import scala.collection.mutable + +class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + + var list = mutable.MutableList[String]() + + override def onStart(startTime: StartTime): Unit = { + list += getClass.getCanonicalName + } + + override def onNext(msg: Message): Unit = { + val l = msg.msg.asInstanceOf[Vector[String]] + list.size match { + case 1 => + l.foreach(f => { + list += f + }) + case _ => + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala new file mode 100644 index 0000000..0359519 --- /dev/null +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.complexdag + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output + + override def onStart(startTime: StartTime): Unit = { + self ! Message("start") + } + + override def onNext(msg: Message): Unit = { + val list = Vector(getClass.getCanonicalName) + output(new Message(list, System.currentTimeMillis)) + self ! Message("continue", System.currentTimeMillis()) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala deleted file mode 100644 index b142d8d..0000000 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.complexdag - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest._ -import org.scalatest.prop.PropertyChecks - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class DagSpec extends PropSpec with PropertyChecks - with Matchers with BeforeAndAfterAll with MasterHarness { - - override def beforeAll { - startActorSystem() - } - - override def afterAll { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("Dag should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - - val masterReceiver = createMockMaster() - val args = requiredArgs - - Future { - Dag.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala deleted file mode 100644 index 35c5824..0000000 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.complexdag - -import org.mockito.Mockito._ -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.MockUtil._ - -class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { - - val context = MockUtil.mockTaskContext - - val node = new Node(context, UserConfig.empty) - - property("Node should send a Vector[String](classOf[Node].getCanonicalName, " + - "classOf[Node].getCanonicalName") { - val list = Vector(classOf[Node].getCanonicalName) - val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName) - node.onNext(Message(list)) - verify(context).output(argMatch[Message](_.msg == expected)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala deleted file mode 100644 index 341f6c6..0000000 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.complexdag - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil - -class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { - - val context = MockUtil.mockTaskContext - - val sink = new Sink(context, UserConfig.empty) - - property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " + - "classOf[Sink].getCanonicalName") { - val list = Vector(classOf[Sink].getCanonicalName) - val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName) - sink.onNext(Message(list)) - - (0 until sink.list.size).map(i => { - assert(sink.list(i).equals(expected(i))) - }) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala deleted file mode 100644 index faa7aa7..0000000 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.complexdag - -import akka.actor.ActorSystem -import org.mockito.Mockito._ -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.MockUtil._ - -class SourceSpec extends WordSpec with Matchers { - - "Source" should { - "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in { - val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG) - - val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG) - - val context = MockUtil.mockTaskContext - - val source = new Source(context, UserConfig.empty) - source.onNext(Message("start")) - - verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala new file mode 100644 index 0000000..cf8ae63 --- /dev/null +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.complexdag + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.scalatest._ +import org.scalatest.prop.PropertyChecks + +import scala.concurrent.Future +import scala.util.Success + +class DagSpec extends PropSpec with PropertyChecks + with Matchers with BeforeAndAfterAll with MasterHarness { + + override def beforeAll { + startActorSystem() + } + + override def afterAll { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("Dag should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + + val masterReceiver = createMockMaster() + val args = requiredArgs + + Future { + Dag.main(masterConfig, args) + } + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala new file mode 100644 index 0000000..241e0f6 --- /dev/null +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.complexdag + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.MockUtil._ +import org.mockito.Mockito._ +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + + val context = MockUtil.mockTaskContext + + val node = new Node(context, UserConfig.empty) + + property("Node should send a Vector[String](classOf[Node].getCanonicalName, " + + "classOf[Node].getCanonicalName") { + val list = Vector(classOf[Node].getCanonicalName) + val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName) + node.onNext(Message(list)) + verify(context).output(argMatch[Message](_.msg == expected)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala new file mode 100644 index 0000000..e7bed30 --- /dev/null +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.complexdag + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil + +class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + + val context = MockUtil.mockTaskContext + + val sink = new Sink(context, UserConfig.empty) + + property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " + + "classOf[Sink].getCanonicalName") { + val list = Vector(classOf[Sink].getCanonicalName) + val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName) + sink.onNext(Message(list)) + + (0 until sink.list.size).map(i => { + assert(sink.list(i).equals(expected(i))) + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala new file mode 100644 index 0000000..20cad1c --- /dev/null +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.complexdag + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.MockUtil._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} + +class SourceSpec extends WordSpec with Matchers { + + "Source" should { + "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in { + val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG) + + val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG) + + val context = MockUtil.mockTaskContext + + val source = new Source(context, UserConfig.empty) + source.onNext(Message("start")) + + verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/README.md b/examples/streaming/fsio/README.md index 06fefb6..200cdb1 100644 --- a/examples/streaming/fsio/README.md +++ b/examples/streaming/fsio/README.md @@ -16,7 +16,7 @@ In order to run the example: 3. Submit the application:<br> ```bash - ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar io.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY + ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY ``` 4. Stop the application:<br> ```bash http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala deleted file mode 100644 index 3b53c9c..0000000 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} -import scala.language.implicitConversions - -import org.apache.hadoop.conf.Configuration - -import io.gearpump.cluster.UserConfig -import io.gearpump.util.Constants._ - -class HadoopConfig(config: UserConfig) { - - def withHadoopConf(conf: Configuration): UserConfig = { - config.withBytes(HADOOP_CONF, serializeHadoopConf(conf)) - } - - def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get) - - private def serializeHadoopConf(conf: Configuration): Array[Byte] = { - val out = new ByteArrayOutputStream() - val dataOut = new DataOutputStream(out) - conf.write(dataOut) - dataOut.close() - out.toByteArray - } - - private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = { - val in = new ByteArrayInputStream(bytes) - val dataIn = new DataInputStream(in) - val result = new Configuration() - result.readFields(dataIn) - dataIn.close() - result - } -} - -object HadoopConfig { - def empty: HadoopConfig = new HadoopConfig(UserConfig.empty) - def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config) - - implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = { - HadoopConfig(userConf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala deleted file mode 100644 index 13fc3f9..0000000 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import java.io.File -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration - -import akka.actor.Cancellable -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.SequenceFile._ -import org.apache.hadoop.io.{SequenceFile, Text} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.fsio.HadoopConfig._ -import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) - extends Task(taskContext, config) { - - import taskContext.taskId - - val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId) - var writer: SequenceFile.Writer = null - val textClass = new Text().getClass - val key = new Text() - val value = new Text() - val hadoopConf = config.hadoopConf - - private var msgCount: Long = 0 - private var snapShotKVCount: Long = 0 - private var snapShotTime: Long = 0 - private var scheduler: Cancellable = null - - override def onStart(startTime: StartTime): Unit = { - - val fs = FileSystem.get(hadoopConf) - fs.deleteOnExit(outputPath) - writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), - Writer.keyClass(textClass), Writer.valueClass(textClass)) - - scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), - new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus()) - snapShotTime = System.currentTimeMillis() - LOG.info("sequence file bolt initiated") - } - - override def onNext(msg: Message): Unit = { - val kv = msg.msg.asInstanceOf[String].split("\\+\\+") - if (kv.length >= 2) { - key.set(kv(0)) - value.set(kv(1)) - writer.append(key, value) - } - msgCount += 1 - } - - override def onStop(): Unit = { - if (scheduler != null) { - scheduler.cancel() - } - writer.close() - LOG.info("sequence file bolt stopped") - } - - private def reportStatus() = { - val current: Long = System.currentTimeMillis() - LOG.info(s"Task $taskId Throughput: ${ - (msgCount - snapShotKVCount, - (current - snapShotTime) / 1000) - } (KVPairs, second)") - snapShotKVCount = msgCount - snapShotTime = current - } -} - -object SeqFileStreamProcessor { - val OUTPUT_PATH = "outputpath" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala deleted file mode 100644 index f9b0d22..0000000 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.SequenceFile._ -import org.apache.hadoop.io.{SequenceFile, Text} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.fsio.HadoopConfig._ -import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig) - extends Task(taskContext, config) { - - import taskContext.output - - val value = new Text() - val key = new Text() - var reader: SequenceFile.Reader = null - val hadoopConf = config.hadoopConf - val fs = FileSystem.get(hadoopConf) - val inputPath = new Path(config.getString(INPUT_PATH).get) - - override def onStart(startTime: StartTime): Unit = { - reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) - self ! Start - LOG.info("sequence file spout initiated") - } - - override def onNext(msg: Message): Unit = { - if (reader.next(key, value)) { - output(Message(key + "++" + value)) - } else { - reader.close() - reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) - } - self ! Continue - } - - override def onStop(): Unit = { - reader.close() - } -} - -object SeqFileStreamProducer { - def INPUT_PATH: String = "inputpath" - - val Start = Message("start") - val Continue = Message("continue") -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala deleted file mode 100644 index 7272f2b..0000000 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import org.apache.hadoop.conf.Configuration -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.ShufflePartitioner -import io.gearpump.streaming.examples.fsio.HadoopConfig._ -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -object SequenceFileIO extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<sequence file reader number>", required = false, - defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<sequence file writer number>", required = false, - defaultValue = Some(1)), - "input" -> CLIOption[String]("<input file path>", required = true), - "output" -> CLIOption[String]("<output file directory>", required = true) - ) - - def application(config: ParseResult): StreamApplication = { - val spoutNum = config.getInt("source") - val boltNum = config.getInt("sink") - val input = config.getString("input") - val output = config.getString("output") - val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input) - .withString(SeqFileStreamProcessor.OUTPUT_PATH, output) - val hadoopConfig = appConfig.withHadoopConf(new Configuration()) - val partitioner = new ShufflePartitioner() - val streamProducer = Processor[SeqFileStreamProducer](spoutNum) - val streamProcessor = Processor[SeqFileStreamProcessor](boltNum) - - val app = StreamApplication("SequenceFileIO", - Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - val appId = context.submit(application(config)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala new file mode 100644 index 0000000..144dd78 --- /dev/null +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import scala.language.implicitConversions + +import org.apache.hadoop.conf.Configuration + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.util.Constants._ + +class HadoopConfig(config: UserConfig) { + + def withHadoopConf(conf: Configuration): UserConfig = { + config.withBytes(HADOOP_CONF, serializeHadoopConf(conf)) + } + + def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get) + + private def serializeHadoopConf(conf: Configuration): Array[Byte] = { + val out = new ByteArrayOutputStream() + val dataOut = new DataOutputStream(out) + conf.write(dataOut) + dataOut.close() + out.toByteArray + } + + private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = { + val in = new ByteArrayInputStream(bytes) + val dataIn = new DataInputStream(in) + val result = new Configuration() + result.readFields(dataIn) + dataIn.close() + result + } +} + +object HadoopConfig { + def empty: HadoopConfig = new HadoopConfig(UserConfig.empty) + def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config) + + implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = { + HadoopConfig(userConf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala new file mode 100644 index 0000000..2e4a556 --- /dev/null +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import java.io.File +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Cancellable +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.SequenceFile._ +import org.apache.hadoop.io.{SequenceFile, Text} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ +import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._ +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) + extends Task(taskContext, config) { + + import taskContext.taskId + + val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId) + var writer: SequenceFile.Writer = null + val textClass = new Text().getClass + val key = new Text() + val value = new Text() + val hadoopConf = config.hadoopConf + + private var msgCount: Long = 0 + private var snapShotKVCount: Long = 0 + private var snapShotTime: Long = 0 + private var scheduler: Cancellable = null + + override def onStart(startTime: StartTime): Unit = { + + val fs = FileSystem.get(hadoopConf) + fs.deleteOnExit(outputPath) + writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), + Writer.keyClass(textClass), Writer.valueClass(textClass)) + + scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), + new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus()) + snapShotTime = System.currentTimeMillis() + LOG.info("sequence file bolt initiated") + } + + override def onNext(msg: Message): Unit = { + val kv = msg.msg.asInstanceOf[String].split("\\+\\+") + if (kv.length >= 2) { + key.set(kv(0)) + value.set(kv(1)) + writer.append(key, value) + } + msgCount += 1 + } + + override def onStop(): Unit = { + if (scheduler != null) { + scheduler.cancel() + } + writer.close() + LOG.info("sequence file bolt stopped") + } + + private def reportStatus() = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task $taskId Throughput: ${ + (msgCount - snapShotKVCount, + (current - snapShotTime) / 1000) + } (KVPairs, second)") + snapShotKVCount = msgCount + snapShotTime = current + } +} + +object SeqFileStreamProcessor { + val OUTPUT_PATH = "outputpath" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala new file mode 100644 index 0000000..02d2434 --- /dev/null +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.SequenceFile._ +import org.apache.hadoop.io.{SequenceFile, Text} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ +import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._ +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig) + extends Task(taskContext, config) { + + import taskContext.output + + val value = new Text() + val key = new Text() + var reader: SequenceFile.Reader = null + val hadoopConf = config.hadoopConf + val fs = FileSystem.get(hadoopConf) + val inputPath = new Path(config.getString(INPUT_PATH).get) + + override def onStart(startTime: StartTime): Unit = { + reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) + self ! Start + LOG.info("sequence file spout initiated") + } + + override def onNext(msg: Message): Unit = { + if (reader.next(key, value)) { + output(Message(key + "++" + value)) + } else { + reader.close() + reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) + } + self ! Continue + } + + override def onStop(): Unit = { + reader.close() + } +} + +object SeqFileStreamProducer { + def INPUT_PATH: String = "inputpath" + + val Start = Message("start") + val Continue = Message("continue") +}
