http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala index fcdbf14..5bafef1 100644 --- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala +++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,23 +17,25 @@ */ package io.gearpump.experiments.distributeservice +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} -import io.gearpump.WorkerId -import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster} +import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} + +import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered} -import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, AppMasterRuntimeEnvironment} -import io.gearpump.cluster.{AppMasterContext, UserConfig, AppDescription, TestUtil} -import io.gearpump.cluster.scheduler.{ResourceAllocation, Relaxation, ResourceRequest, Resource} -import DistServiceAppMaster.{FileContainer, GetFileContainer} +import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} +import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig} +import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer} import io.gearpump.util.ActorSystemBooter.RegisterActorSystem import io.gearpump.util.ActorUtil -import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} - -import scala.concurrent.duration._ -class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{ +class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter { implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG) val mockMaster = TestProbe()(system) val mockWorker1 = TestProbe()(system) @@ -45,37 +47,41 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) val resource = Resource(1) val appJar = None - val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty) + val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, + UserConfig.empty) "DistService AppMaster" should { "responsable for service distributing" in { val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref) - val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, appMasterInfo) + val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, + appMasterInfo) TestActorRef[DistServiceAppMaster]( AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) - val registerAppMaster = mockMaster.receiveOne(15 seconds) + val registerAppMaster = mockMaster.receiveOne(15.seconds) assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster mockMaster.reply(AppMasterRegistered(appId)) - //The DistributedShell AppMaster will ask for worker list + // The DistributedShell AppMaster will ask for worker list mockMaster.expectMsg(GetAllWorkers) mockMaster.reply(WorkerList(workerList)) - //After worker list is ready, DistributedShell AppMaster will request resouce on each worker + // After worker list is ready, DistributedShell AppMaster will request resouce on each worker workerList.foreach { workerId => - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER))) + mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, + relaxation = Relaxation.SPECIFICWORKER))) } - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L))))) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, + WorkerId(1, 0L))))) mockWorker1.expectMsgClass(classOf[LaunchExecutor]) mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) appMaster.tell(GetFileContainer, client.ref) - client.expectMsgClass(15 seconds, classOf[FileContainer]) + client.expectMsgClass(15.seconds, classOf[FileContainer]) } } after { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala index 2f9928c..2e37091 100644 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala +++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.experiments.pagerank import akka.actor.ActorSystem -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.appmaster.AppMaster + import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig} -import PageRankApplication.NodeWithTaskId +import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.appmaster.AppMaster +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph import io.gearpump.util.Graph.Node @@ -30,21 +31,19 @@ import io.gearpump.util.Graph.Node * * A simple and naive pagerank implementation. * - * We will continue to optimize this to able to run page rank of tens of millions of nodes - * * @param name name of the application * @param iteration max iteration count * @param delta decide the accuracy when the page rank example stops. * @param dag the page rank graph - * @tparam T */ -class PageRankApplication[T] (override val name : String, iteration: Int, delta: Double, dag: Graph[T, _]) +class PageRankApplication[T]( + override val name: String, iteration: Int, delta: Double, dag: Graph[T, _]) extends Application { override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster] override def userConfig(implicit system: ActorSystem): UserConfig = { - // map node with taskId + // Map node with taskId var taskId = 0 val pageRankDag = dag.mapVertex { node => val updatedNode = NodeWithTaskId(taskId, node) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala index 8771a40..0fb689d 100644 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala +++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,14 @@ package io.gearpump.experiments.pagerank import akka.actor.Actor.Receive -import io.gearpump.streaming.task._ + import io.gearpump.cluster.UserConfig -import PageRankController.Tick -import PageRankWorker.LatestWeight +import io.gearpump.experiments.pagerank.PageRankController.Tick +import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight +import io.gearpump.streaming.task._ -class PageRankController (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class PageRankController(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { val taskCount = conf.getInt(PageRankApplication.COUNT).get val iterationMax = conf.getInt(PageRankApplication.ITERATION).get @@ -37,11 +39,11 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T var weights = Map.empty[TaskId, Double] var deltas = Map.empty[TaskId, Double] - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { output(Tick(tick), tasks: _*) } - private def output(msg: AnyRef, tasks: TaskId *): Unit = { + private def output(msg: AnyRef, tasks: TaskId*): Unit = { taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) } @@ -49,7 +51,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T case LatestWeight(taskId, weight, replyTick) => if (this.tick == replyTick) { - deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0)) + deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0)) weights += taskId -> weight receivedWeightForCurrentTick += 1 if (receivedWeightForCurrentTick == taskCount) { @@ -66,7 +68,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T } private def continueIteration: Boolean = { - (tick < iterationMax) && deltas.values.foldLeft(false) {(deltaExceed, value) => + (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) => deltaExceed || value > delta } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala index 87c30c2..9f38cd7 100644 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala +++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,17 @@ package io.gearpump.experiments.pagerank import akka.actor.Actor.Receive -import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper} + import io.gearpump.cluster.UserConfig -import PageRankApplication.NodeWithTaskId -import PageRankController.Tick -import PageRankWorker.{LatestWeight, UpdateWeight} +import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId +import io.gearpump.experiments.pagerank.PageRankController.Tick +import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight} +import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper} import io.gearpump.util.Graph -class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.taskId - private var weight: Double = 1.0 private var upstreamWeights = Map.empty[TaskId, Double] @@ -41,14 +41,15 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t node.taskId == taskContext.taskId.index }.get - private val downstream = graph.outgoingEdgesOf(node).map(_._3).map(id => taskId.copy(index = id.taskId)).toSeq + private val downstream = graph.outgoingEdgesOf(node).map(_._3) + .map(id => taskId.copy(index = id.taskId)).toSeq private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length - LOG.info(s"downstream nodes: $downstream" ) + LOG.info(s"downstream nodes: $downstream") private var tick = 0 - private def output(msg: AnyRef, tasks: TaskId *): Unit = { + private def output(msg: AnyRef, tasks: TaskId*): Unit = { taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) } @@ -57,7 +58,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t this.tick = tick if (downstream.length == 0) { - // if there is no downstream, we will evenly distribute our page rank to + // If there is no downstream, we will evenly distribute our page rank to // every node in the graph val update = UpdateWeight(taskId, weight / taskCount) output(update, allTasks: _*) @@ -65,10 +66,10 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t val update = UpdateWeight(taskId, weight / downstream.length) output(update, downstream: _*) } - case update@ UpdateWeight(upstreamTaskId, weight) => + case update@UpdateWeight(upstreamTaskId, weight) => upstreamWeights += upstreamTaskId -> weight if (upstreamWeights.size == upstreamCount) { - val nextWeight = upstreamWeights.foldLeft(0.0) {(sum, item) => sum + item._2} + val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 } this.upstreamWeights = Map.empty[TaskId, Double] this.weight = nextWeight output(LatestWeight(taskId, weight, tick), TaskId(0, 0)) @@ -76,7 +77,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t } } -object PageRankWorker{ +object PageRankWorker { case class UpdateWeight(taskId: TaskId, weight: Double) case class LatestWeight(taskId: TaskId, weight: Double, tick: Int) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala index 2fb9b81..3877974 100644 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala +++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,12 @@ */ package io.gearpump.experiments.pagerank.example -import io.gearpump.experiments.pagerank.PageRankApplication import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, Graph} +import io.gearpump.experiments.pagerank.PageRankApplication import io.gearpump.util.Graph.Node +import io.gearpump.util.{AkkaApp, Graph} +/** A very simple PageRank example, Cyclic graph is not supported */ object PageRankExample extends AkkaApp { val a = "a" @@ -29,7 +30,7 @@ object PageRankExample extends AkkaApp { val c = "c" val d = "d" - def help: Unit = Unit + def help(): Unit = Unit def main(akkaConf: Config, args: Array[String]): Unit = { val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala index a7e1f92..b517126 100644 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala +++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,57 +18,58 @@ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.task.TaskContext +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.task.TaskContext +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph.{Node => GraphNode} import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger -/* - digraph flow { - Source_0 -> Sink_0; - Source_0 -> Sink_1; - Source_0 -> Sink_2; - Source_0 -> Node_1; - Source_1 -> Node_0; - Node_0 -> Sink_3; - Node_1 -> Sink_3; - Node_1 -> Sink_4; - Node_1 -> Node_4; - Node_2 -> Node_3; - Node_1 -> Node_3; - Source_0 -> Node_2; - Source_0 -> Node_3; - Node_3 -> Sink_3; - Node_4 -> Sink_3; - Source_1 -> Sink_4; - } -*/ -case class Source_0(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf) -case class Source_1(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf) -case class Node_0(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_1(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_2(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_3(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Node_4(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf) -case class Sink_0(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_1(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_2(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_3(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf) -case class Sink_4(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) +case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf) +case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf) +case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf) +/** + * digraph flow { + * Source_0 -> Sink_0; + * Source_0 -> Sink_1; + * Source_0 -> Sink_2; + * Source_0 -> Node_1; + * Source_1 -> Node_0; + * Node_0 -> Sink_3; + * Node_1 -> Sink_3; + * Node_1 -> Sink_4; + * Node_1 -> Node_4; + * Node_2 -> Node_3; + * Node_1 -> Node_3; + * Source_0 -> Node_2; + * Source_0 -> Node_3; + * Node_3 -> Sink_3; + * Node_4 -> Sink_3; + * Source_1 -> Sink_4; + * } + */ object Dag extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) val RUN_FOR_EVER = -1 override val options: Array[(String, CLIOption[Any])] = Array.empty - def application(config: ParseResult) : StreamApplication = { - + def application(config: ParseResult): StreamApplication = { + val source_0 = Processor[Source_0](1) val source_1 = Processor[Source_1](1) val node_0 = Processor[Node_0](1) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala index e8837ed..dfad6c8 100644 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala +++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,16 @@ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class Node (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime : StartTime) : Unit = {} + override def onStart(startTime: StartTime): Unit = {} - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { val list = msg.msg.asInstanceOf[Vector[String]] output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis())) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala index adc3115..b091a17 100644 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala +++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,11 @@ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} +import scala.collection.mutable + import io.gearpump.Message import io.gearpump.cluster.UserConfig - -import scala.collection.mutable +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -42,5 +42,4 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, case _ => } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala index 80cd8d2..df656ac 100644 --- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala +++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,12 @@ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.{output, self} + import taskContext.output override def onStart(startTime: StartTime): Unit = { self ! Message("start") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala index 02a1017..b142d8d 100644 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,18 @@ package io.gearpump.streaming.examples.complexdag -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{TestUtil, MasterHarness} -import io.gearpump.util.Util +import scala.concurrent.Future +import scala.util.Success + import org.scalatest._ import org.scalatest.prop.PropertyChecks -import scala.util.Success -import scala.concurrent.Future -class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { +import io.gearpump.cluster.ClientToMaster.SubmitApplication +import io.gearpump.cluster.MasterToClient.SubmitApplicationResult +import io.gearpump.cluster.{MasterHarness, TestUtil} + +class DagSpec extends PropSpec with PropertyChecks + with Matchers with BeforeAndAfterAll with MasterHarness { override def beforeAll { startActorSystem() @@ -37,7 +39,7 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG property("Dag should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -45,7 +47,9 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA val masterReceiver = createMockMaster() val args = requiredArgs - Future{Dag.main(masterConfig, args)} + Future { + Dag.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala index d151651..35c5824 100644 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,25 +17,25 @@ */ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.MockUtil -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import MockUtil._ -import org.mockito.ArgumentMatcher -import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.MockUtil._ + class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { val context = MockUtil.mockTaskContext val node = new Node(context, UserConfig.empty) - property("Node should send a Vector[String](classOf[Node].getCanonicalName, classOf[Node].getCanonicalName"){ + property("Node should send a Vector[String](classOf[Node].getCanonicalName, " + + "classOf[Node].getCanonicalName") { val list = Vector(classOf[Node].getCanonicalName) - val expected = Vector(classOf[Node].getCanonicalName,classOf[Node].getCanonicalName) + val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName) node.onNext(Message(list)) verify(context).output(argMatch[Message](_.msg == expected)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala index b26f639..341f6c6 100644 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,19 +17,21 @@ */ package io.gearpump.streaming.examples.complexdag -import io.gearpump.streaming.MockUtil -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil + class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { val context = MockUtil.mockTaskContext val sink = new Sink(context, UserConfig.empty) - property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName"){ + property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " + + "classOf[Sink].getCanonicalName") { val list = Vector(classOf[Sink].getCanonicalName) val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName) sink.onNext(Message(list)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala index 21ae6ab..faa7aa7 100644 --- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,14 @@ package io.gearpump.streaming.examples.complexdag import akka.actor.ActorSystem -import io.gearpump.streaming.MockUtil -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import MockUtil._ -import org.mockito.ArgumentMatcher -import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} +import io.gearpump.Message +import io.gearpump.cluster.{TestUtil, UserConfig} +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.MockUtil._ + class SourceSpec extends WordSpec with Matchers { "Source" should { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala index c8c1135..3b53c9c 100644 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala +++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,22 @@ package io.gearpump.streaming.examples.fsio import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import scala.language.implicitConversions + +import org.apache.hadoop.conf.Configuration import io.gearpump.cluster.UserConfig import io.gearpump.util.Constants._ -import org.apache.hadoop.conf.Configuration -import scala.language.implicitConversions +class HadoopConfig(config: UserConfig) { -class HadoopConfig(config: UserConfig) { + def withHadoopConf(conf: Configuration): UserConfig = { + config.withBytes(HADOOP_CONF, serializeHadoopConf(conf)) + } - def withHadoopConf(conf : Configuration) : UserConfig = config.withBytes(HADOOP_CONF, serializeHadoopConf(conf)) - def hadoopConf : Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get) + def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get) - private def serializeHadoopConf(conf: Configuration) : Array[Byte] = { + private def serializeHadoopConf(conf: Configuration): Array[Byte] = { val out = new ByteArrayOutputStream() val dataOut = new DataOutputStream(out) conf.write(dataOut) @@ -38,10 +41,10 @@ class HadoopConfig(config: UserConfig) { out.toByteArray } - private def deserializeHadoopConf(bytes: Array[Byte]) : Configuration = { + private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = { val in = new ByteArrayInputStream(bytes) val dataIn = new DataInputStream(in) - val result= new Configuration() + val result = new Configuration() result.readFields(dataIn) dataIn.close() result @@ -49,8 +52,8 @@ class HadoopConfig(config: UserConfig) { } object HadoopConfig { - def empty = new HadoopConfig(UserConfig.empty) - def apply(config: UserConfig) = new HadoopConfig(config) + def empty: HadoopConfig = new HadoopConfig(UserConfig.empty) + def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config) implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = { HadoopConfig(userConf) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala index 808f3b1..13fc3f9 100644 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala +++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,20 +19,21 @@ package io.gearpump.streaming.examples.fsio import java.io.File import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration import akka.actor.Cancellable -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import SeqFileStreamProcessor._ -import HadoopConfig._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile._ import org.apache.hadoop.io.{SequenceFile, Text} -import scala.concurrent.duration.FiniteDuration +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.examples.fsio.HadoopConfig._ +import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._ +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){ +class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) + extends Task(taskContext, config) { import taskContext.taskId @@ -43,16 +44,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte val value = new Text() val hadoopConf = config.hadoopConf - private var msgCount : Long = 0 - private var snapShotKVCount : Long = 0 - private var snapShotTime : Long = 0 + private var msgCount: Long = 0 + private var snapShotKVCount: Long = 0 + private var snapShotTime: Long = 0 private var scheduler: Cancellable = null - override def onStart(startTime : StartTime) = { + override def onStart(startTime: StartTime): Unit = { val fs = FileSystem.get(hadoopConf) fs.deleteOnExit(outputPath) - writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), Writer.keyClass(textClass), Writer.valueClass(textClass)) + writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), + Writer.keyClass(textClass), Writer.valueClass(textClass)) scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus()) @@ -62,7 +64,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte override def onNext(msg: Message): Unit = { val kv = msg.msg.asInstanceOf[String].split("\\+\\+") - if(kv.length >= 2) { + if (kv.length >= 2) { key.set(kv(0)) value.set(kv(1)) writer.append(key, value) @@ -70,7 +72,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte msgCount += 1 } - override def onStop(): Unit ={ + override def onStop(): Unit = { if (scheduler != null) { scheduler.cancel() } @@ -78,14 +80,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte LOG.info("sequence file bolt stopped") } - def reportStatus() = { - val current : Long = System.currentTimeMillis() - LOG.info(s"Task $taskId Throughput: ${(msgCount - snapShotKVCount, (current - snapShotTime) / 1000)} (KVPairs, second)") + private def reportStatus() = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task $taskId Throughput: ${ + (msgCount - snapShotKVCount, + (current - snapShotTime) / 1000) + } (KVPairs, second)") snapShotKVCount = msgCount snapShotTime = current } } -object SeqFileStreamProcessor{ +object SeqFileStreamProcessor { val OUTPUT_PATH = "outputpath" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala index 5d32f74..f9b0d22 100644 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala +++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,18 +17,20 @@ */ package io.gearpump.streaming.examples.fsio -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import SeqFileStreamProducer._ -import HadoopConfig._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile._ import org.apache.hadoop.io.{SequenceFile, Text} -class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){ +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.examples.fsio.HadoopConfig._ +import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._ +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig) + extends Task(taskContext, config) { - import taskContext.{output, self} + import taskContext.output val value = new Text() val key = new Text() @@ -37,14 +39,14 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten val fs = FileSystem.get(hadoopConf) val inputPath = new Path(config.getString(INPUT_PATH).get) - override def onStart(startTime : StartTime) = { + override def onStart(startTime: StartTime): Unit = { reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) self ! Start LOG.info("sequence file spout initiated") } - override def onNext(msg: Message) = { - if(reader.next(key, value)){ + override def onNext(msg: Message): Unit = { + if (reader.next(key, value)) { output(Message(key + "++" + value)) } else { reader.close() @@ -53,13 +55,13 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten self ! Continue } - override def onStop(): Unit ={ + override def onStop(): Unit = { reader.close() } } -object SeqFileStreamProducer{ - def INPUT_PATH = "inputpath" +object SeqFileStreamProducer { + def INPUT_PATH: String = "inputpath" val Start = Message("start") val Continue = Message("continue") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala index c752bba..7272f2b 100644 --- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala +++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,39 +17,44 @@ */ package io.gearpump.streaming.examples.fsio -import io.gearpump.streaming.{StreamApplication, Processor} +import org.apache.hadoop.conf.Configuration +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.ShufflePartitioner +import io.gearpump.streaming.examples.fsio.HadoopConfig._ +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph._ -import HadoopConfig._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.apache.hadoop.conf.Configuration -import org.slf4j.Logger object SequenceFileIO extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "source"-> CLIOption[Int]("<sequence file reader number>", required = false, defaultValue = Some(1)), - "sink"-> CLIOption[Int]("<sequence file writer number>", required = false, defaultValue = Some(1)), - "input"-> CLIOption[String]("<input file path>", required = true), - "output"-> CLIOption[String]("<output file directory>", required = true) + "source" -> CLIOption[Int]("<sequence file reader number>", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<sequence file writer number>", required = false, + defaultValue = Some(1)), + "input" -> CLIOption[String]("<input file path>", required = true), + "output" -> CLIOption[String]("<output file directory>", required = true) ) - def application(config: ParseResult) : StreamApplication = { + def application(config: ParseResult): StreamApplication = { val spoutNum = config.getInt("source") val boltNum = config.getInt("sink") val input = config.getString("input") val output = config.getString("output") - val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input).withString(SeqFileStreamProcessor.OUTPUT_PATH, output) + val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input) + .withString(SeqFileStreamProcessor.OUTPUT_PATH, output) val hadoopConfig = appConfig.withHadoopConf(new Configuration()) val partitioner = new ShufflePartitioner() val streamProducer = Processor[SeqFileStreamProducer](spoutNum) val streamProcessor = Processor[SeqFileStreamProcessor](boltNum) - val app = StreamApplication("SequenceFileIO", Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig) + val app = StreamApplication("SequenceFileIO", + Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala index 64ba697..e5dbe0b 100644 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala +++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.streaming.examples.fsio -import io.gearpump.cluster.UserConfig import org.apache.hadoop.conf.Configuration import org.scalatest.{Matchers, WordSpec} +import io.gearpump.cluster.UserConfig + class HadoopConfigSpec extends WordSpec with Matchers { "HadoopConfig" should { @@ -32,7 +33,7 @@ class HadoopConfigSpec extends WordSpec with Matchers { val user = UserConfig.empty - import HadoopConfig._ + import io.gearpump.streaming.examples.fsio.HadoopConfig._ assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala index 99c7113..bb0d26b 100644 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala +++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,10 @@ package io.gearpump.streaming.examples.fsio import java.io.File +import scala.collection.mutable.ArrayBuffer import akka.actor.ActorSystem import akka.testkit.TestProbe -import io.gearpump.streaming.{Processor, MockUtil} -import io.gearpump.streaming.task.{StartTime, TaskId} -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.streaming.Processor -import io.gearpump.streaming.task.StartTime import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.Reader @@ -36,8 +31,13 @@ import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} -import scala.collection.mutable.ArrayBuffer -class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { +import io.gearpump.Message +import io.gearpump.cluster.{TestUtil, UserConfig} +import io.gearpump.streaming.task.{StartTime, TaskId} +import io.gearpump.streaming.{MockUtil, Processor} +class SeqFileStreamProcessorSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + val kvPairs = new ArrayBuffer[(String, String)] val outputDirectory = "SeqFileStreamProcessor_Test" val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 0)) @@ -56,7 +56,8 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match implicit val system1 = ActorSystem("SeqFileStreamProcessor", TestUtil.DEFAULT_CONFIG) val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG) val watcher = TestProbe()(system1) - val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, outputDirectory)).withHadoopConf(new Configuration()) + val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, + outputDirectory)).withHadoopConf(new Configuration()) val context = MockUtil.mockTaskContext val processorDescription = @@ -80,7 +81,7 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match val reader = new SequenceFile.Reader(hadoopConf, Reader.file(sequenceFilePath)) kvPairs.foreach { kv => val (key, value) = kv - if(value.length > 0 && reader.next(_key, _value)) { + if (value.length > 0 && reader.next(_key, _value)) { assert(_key.toString == key && _value.toString == value) } } @@ -90,4 +91,4 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match after { fs.deleteOnExit(new Path(outputDirectory)) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala index cb6f553..04dafa7 100644 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala +++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,26 +17,26 @@ */ package io.gearpump.streaming.examples.fsio -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.examples.fsio.HadoopConfig -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import MockUtil._ +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.Writer import org.apache.hadoop.io.{SequenceFile, Text} -import org.mockito.ArgumentMatcher -import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} -import scala.collection.mutable.ArrayBuffer +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.MockUtil._ +import io.gearpump.streaming.task.StartTime + +class SeqFileStreamProducerSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { -class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter{ val kvPairs = new ArrayBuffer[(String, String)] val inputFile = "SeqFileStreamProducer_Test" val sequenceFilePath = new Path(inputFile) @@ -53,7 +53,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche before { fs.deleteOnExit(sequenceFilePath) - val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), Writer.keyClass(textClass), Writer.valueClass(textClass)) + val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), + Writer.keyClass(textClass), Writer.valueClass(textClass)) forAll(kvGenerator) { kv => _key.set(kv._1) _value.set(kv._2) @@ -63,9 +64,11 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche writer.close() } - property("SeqFileStreamProducer should read the key-value pairs from a sequence file and deliver them") { + property("SeqFileStreamProducer should read the key-value pairs from " + + "a sequence file and deliver them") { - val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, inputFile)).withHadoopConf(new Configuration()) + val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, + inputFile)).withHadoopConf(new Configuration()) val context = MockUtil.mockTaskContext @@ -74,7 +77,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche producer.onNext(Message("start")) val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet - verify(context).output(argMatch[Message](msg => expected.contains(msg.msg.asInstanceOf[String]))) + verify(context).output(argMatch[Message](msg => + expected.contains(msg.msg.asInstanceOf[String]))) } after { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala index f10227d..efb5e44 100644 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala +++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package io.gearpump.streaming.examples.fsio +import scala.concurrent.Future +import scala.util.{Success, Try} + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} + import io.gearpump.cluster.ClientToMaster.SubmitApplication import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.util.Util -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} -import scala.util.{Success, Try} -import scala.concurrent.Future +class SequenceFileIOSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { -class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { override def beforeAll { startActorSystem() } @@ -37,7 +40,7 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + override def config: Config = TestUtil.DEFAULT_CONFIG property("SequenceFileIO should succeed to submit application with required arguments") { val requiredArgs = Array( @@ -58,7 +61,9 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {SequenceFileIO.main(masterConfig, args)} + Future { + SequenceFileIO.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) } @@ -75,5 +80,4 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with assert(Try(SequenceFileIO.main(args)).isFailure, "missing required arguments, print usage") } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala index b4d9a04..35b6594 100644 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala +++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,6 +19,8 @@ package io.gearpump.streaming.examples.kafka import akka.actor.ActorSystem +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} @@ -29,21 +31,26 @@ import io.gearpump.streaming.sink.DataSinkProcessor import io.gearpump.streaming.source.DataSourceProcessor import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger object KafkaReadWrite extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, defaultValue = Some(1)), - "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, defaultValue = Some("localhost:2181")), - "brokerList" -> CLIOption[String]("<broker server list string>", required = false, defaultValue = Some("localhost:9092")), - "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, defaultValue = Some("topic1")), - "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, defaultValue = Some("topic2")) + "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, + defaultValue = Some(1)), + "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, + defaultValue = Some("localhost:2181")), + "brokerList" -> CLIOption[String]("<broker server list string>", required = false, + defaultValue = Some("localhost:9092")), + "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, + defaultValue = Some("topic1")), + "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, + defaultValue = Some("topic2")) ) - def application(config: ParseResult, system: ActorSystem) : StreamApplication = { + def application(config: ParseResult, system: ActorSystem): StreamApplication = { implicit val actorSystem = system val sourceNum = config.getInt("source") val sinkNum = config.getInt("sink") @@ -59,7 +66,7 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser { val sink = new KafkaSink(sinkTopic, brokerList) val sinkProcessor = DataSinkProcessor(sink, sinkNum) val partitioner = new ShufflePartitioner - val computation = sourceProcessor ~ partitioner ~> sinkProcessor + val computation = sourceProcessor ~ partitioner ~> sinkProcessor val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala index 51bc3d6..6955bcc 100644 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala +++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,31 +19,34 @@ package io.gearpump.streaming.examples.kafka.wordcount import akka.actor.ActorSystem -import io.gearpump.streaming.kafka.lib.KafkaSourceConfig -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.kafka.{KafkaSink, KafkaStorageFactory, KafkaSource} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor +import kafka.api.OffsetRequest +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.kafka.lib.KafkaSourceConfig +import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.sink.DataSinkProcessor +import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import kafka.api.OffsetRequest -import org.slf4j.Logger object KafkaWordCount extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)), + "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, + defaultValue = Some(1)), "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1)) - ) + "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, + defaultValue = Some(1)) + ) - def application(config: ParseResult, system: ActorSystem) : StreamApplication = { + def application(config: ParseResult, system: ActorSystem): StreamApplication = { implicit val actorSystem = system val sourceNum = config.getInt("source") val splitNum = config.getInt("split") @@ -61,7 +64,8 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser { val sink = new KafkaSink("topic2", "localhost:9092") val sinkProcessor = DataSinkProcessor(sink, sinkNum) val partitioner = new HashPartitioner - val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> sum ~ partitioner ~> sinkProcessor + val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> + sum ~ partitioner ~> sinkProcessor val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala index 499f1ac..b46d170 100644 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala +++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,18 +19,20 @@ package io.gearpump.streaming.examples.kafka.wordcount import com.twitter.bijection.Injection -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} + import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { } - override def onNext(msg : Message) : Unit = { - Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]).foreach(_.split("\\s+").foreach( - word => output(new Message(word, msg.timestamp)))) + override def onNext(msg: Message): Unit = { + Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]) + .foreach(_.split("\\s+").foreach( + word => output(new Message(word, msg.timestamp)))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala index 465754e..9c67733 100644 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala +++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,18 +19,19 @@ package io.gearpump.streaming.examples.kafka.wordcount import com.twitter.bijection.Injection -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} + import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class Sum(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output private[wordcount] var wordcount = Map.empty[String, Long] - override def onStart(startTime : StartTime) : Unit = {} + override def onStart(startTime: StartTime): Unit = {} - override def onNext(message : Message) : Unit = { + override def onNext(message: Message): Unit = { val word = message.msg.asInstanceOf[String] val count = wordcount.getOrElse(word, 0L) + 1 wordcount += word -> count http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala index 9e79d7e..35f7a62 100644 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala +++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,19 @@ package io.gearpump.streaming.examples.kafka.wordcount -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.util.Util +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} -import scala.util.Success -import scala.concurrent.Future +import io.gearpump.cluster.ClientToMaster.SubmitApplication +import io.gearpump.cluster.MasterToClient.SubmitApplicationResult +import io.gearpump.cluster.{MasterHarness, TestUtil} -class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { +class KafkaWordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { before { startActorSystem() @@ -38,7 +40,7 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + override def config: Config = TestUtil.DEFAULT_CONFIG property("KafkaWordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -58,7 +60,9 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {KafkaWordCount.main(masterConfig, args)} + Future { + KafkaWordCount.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala index 864f97d..2cc6a16 100644 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala +++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,14 @@ package io.gearpump.streaming.examples.kafka.wordcount import com.twitter.bijection.Injection -import io.gearpump.streaming.task.TaskContext -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar -import scala.language.postfixOps +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.TaskContext class SplitSpec extends FlatSpec with Matchers with MockitoSugar { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala index 6014d0f..4dcb9d7 100644 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala +++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,23 +17,24 @@ */ package io.gearpump.streaming.examples.kafka.wordcount -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig +import scala.collection.mutable + import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.{FlatSpec, Matchers} -import scala.collection.mutable +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime class SumSpec extends FlatSpec with Matchers { it should "sum should calculate the frequency of the word correctly" in { val stringGenerator = Gen.alphaStr - val expectedWordCountMap : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() + val expectedWordCountMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() val taskContext = MockUtil.mockTaskContext @@ -42,7 +43,7 @@ class SumSpec extends FlatSpec with Matchers { val str = "once two two three three three" var totalWordCount = 0 - stringGenerator.map {word => + stringGenerator.map { word => totalWordCount += 1 expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) + 1) sum.onNext(Message(word)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/README.md b/examples/streaming/sol/README.md index e772af4..a8b10b3 100644 --- a/examples/streaming/sol/README.md +++ b/examples/streaming/sol/README.md @@ -1,6 +1,6 @@ SOL is a throughput test. It will create multiple layers, and then do random shuffling between these layers. -SOLPRoducer -> SOLProcessor -> SOLProcessor -> ... +SOLProducer -> SOLProcessor -> SOLProcessor -> ... The original code comes from: https://github.com/yahoo/storm-perf-test http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala index b531250..10c190c 100644 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala +++ b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,25 +18,30 @@ package io.gearpump.streaming.examples.sol -import io.gearpump.streaming.{StreamApplication, Processor} +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.ShufflePartitioner +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph._ import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger object SOL extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "streamProducer"-> CLIOption[Int]("<stream producer number>", required = false, defaultValue = Some(1)), - "streamProcessor"-> CLIOption[Int]("<stream processor number>", required = false, defaultValue = Some(1)), - "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, defaultValue = Some(100)), - "stages"-> CLIOption[Int]("<how many stages to run>", required = false, defaultValue = Some(2))) + "streamProducer" -> CLIOption[Int]("<stream producer number>", required = false, + defaultValue = Some(1)), + "streamProcessor" -> CLIOption[Int]("<stream processor number>", required = false, + defaultValue = Some(1)), + "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, + defaultValue = Some(100)), + "stages" -> CLIOption[Int]("<how many stages to run>", required = false, + defaultValue = Some(2))) - def application(config: ParseResult) : StreamApplication = { + def application(config: ParseResult): StreamApplication = { val spoutNum = config.getInt("streamProducer") val boltNum = config.getInt("streamProcessor") val bytesPerMessage = config.getInt("bytesPerMessage")
