http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala deleted file mode 100644 index ee06b25..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import scala.util.Random - -import io.gearpump.streaming.examples.transport.PassRecord -import io.gearpump.util.LogUtil - -class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) { - private val LOG = LogUtil.getLogger(getClass) - LOG.info(s"Generate pass record for vehicle $vehicleId") - private var timeStamp = System.currentTimeMillis() - - private var locationId = city.randomLocationId() - private val random = new Random() - private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE - private val (randomMin, randomRange) = { - val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat - val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat - val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE - val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES) - val randomRange = upperBound - randomMin - (randomMin.toInt, randomRange.toInt) - } - - def getNextPassRecord(): PassRecord = { - locationId = if (fakePlate) { - city.randomLocationId() - } else { - city.nextLocation(locationId) - } - timeStamp += (random.nextInt(randomRange) + randomMin) - PassRecord(vehicleId, locationId, timeStamp) - } -} - -object PassRecordGenerator { - final val FAKE_PLATE_RATE = 0.01F - final val OVERDRIVE_RATE = 0.05F - final val TWOMINUTES = 2 * 60 * 1000 - - def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int) - : Array[PassRecordGenerator] = { - var result = Map.empty[String, PassRecordGenerator] - val digitsNum = (Math.log10(generatorNum) + 1).toInt - for (i <- 1 to generatorNum) { - val vehicleId = prefix + s"%0${digitsNum}d".format(i) - val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold) - result += vehicleId -> generator - } - result.values.toArray - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala new file mode 100644 index 0000000..0aaf72c --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +case class LocationInfo(id: String, row: Int, column: Int) + +// scalastyle:off equals.hash.code +case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) { + override def hashCode: Int = vehicleId.hashCode +} +// scalastyle:on equals.hash.code + +case class GetTrace(vehicleId: String) + +case class VehicleTrace(records: Array[PassRecord]) + +case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala new file mode 100644 index 0000000..555e850 --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import scala.concurrent.duration._ + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} + +class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.{output, parallelism, scheduleOnce, taskId} + private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get + private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism + private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get + private val mockCity = new MockCity(citySize) + private val recordGenerators: Array[PassRecordGenerator] = + PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold) + + override def onStart(startTime: StartTime): Unit = { + self ! Message("start", System.currentTimeMillis()) + } + + override def onNext(msg: Message): Unit = { + recordGenerators.foreach(generator => + output(Message(generator.getNextPassRecord(), System.currentTimeMillis()))) + scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis())) + } + + private def getIdentifier(taskId: TaskId): String = { + // scalastyle:off non.ascii.character.disallowed + s"沪A${taskId.processorId}${taskId.index}" + // scalastyle:on non.ascii.character.disallowed + } +} + +object DataSource { + final val VEHICLE_NUM = "vehicle.number" + final val MOCK_CITY_SIZE = "mock.city.size" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala new file mode 100644 index 0000000..ff3b4b4 --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import java.util.concurrent.TimeUnit +import scala.concurrent.Future +import scala.util.{Failure, Success} + +import akka.actor.Actor._ +import akka.actor.{Actor, ActorRefFactory, Props} +import akka.io.IO +import akka.pattern.ask +import spray.can.Http +import spray.http.StatusCodes +import spray.json._ +import spray.routing.{HttpService, Route} +import upickle.default.write + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer} +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication} +import org.apache.gearpump.util.Graph + +class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import system.dispatcher + import taskContext.appMaster + + var inspector: (ProcessorId, ProcessorDescription) = null + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + private var overSpeedRecords = List.empty[OverSpeedReport] + + override def onStart(startTime: StartTime): Unit = { + val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( + StreamApplication.DAG).get) + inspector = dag.processors.find { kv => + val (_, processor) = kv + processor.taskClass == classOf[VelocityInspector].getName + }.get + taskContext.actorOf(Props(new WebServer)) + } + + override def onNext(msg: Message): Unit = { + } + + override def receiveUnManagedMessage: Receive = { + case getTrace@GetTrace(vehicleId: String) => + val parallism = inspector._2.parallelism + val processorId = inspector._1 + val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism) + val requester = sender + (appMaster ? LookupTaskActorRef(analyzerTaskId)) + .asInstanceOf[Future[TaskActorRef]].flatMap { task => + (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]] + }.map { trace => + LOG.info(s"reporting $trace") + requester ! trace + } + case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) => + LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h") + overSpeedRecords :+= record + case GetAllRecords => + sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp)) + overSpeedRecords = List.empty[OverSpeedReport] + case _ => + // Ignore + } +} + +object QueryServer { + object GetAllRecords + + case class OverSpeedRecords(records: Array[OverSpeedReport]) + + class WebServer extends Actor with HttpService { + + import context.dispatcher + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + def actorRefFactory: ActorRefFactory = context + implicit val system = context.system + + IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080) + + override def receive: Receive = runRoute(webServer ~ staticRoute) + + def webServer: Route = { + path("trace" / Segment) { vehicleId => + get { + onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) { + case Success(trace: VehicleTrace) => + val json = write(trace) + complete(pretty(json)) + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") + } + } + } ~ + path("records") { + get { + onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) { + case Success(records: OverSpeedRecords) => + val json = write(records) + complete(pretty(json)) + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") + } + } + } + } + + val staticRoute = { + pathEndOrSingleSlash { + getFromResource("transport/transport.html") + } ~ + pathPrefix("css") { + get { + getFromResourceDirectory("transport/css") + } + } ~ + pathPrefix("svg") { + get { + getFromResourceDirectory("transport/svg") + } + } ~ + pathPrefix("js") { + get { + getFromResourceDirectory("transport/js") + } + } + } + + private def pretty(json: String): String = { + json.parseJson.prettyPrint + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala new file mode 100644 index 0000000..5beb2e1 --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph} + +/** A city smart transportation streaming application */ +object Transport extends AkkaApp with ArgumentsParser { + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<how many task to generate data>", required = false, + defaultValue = Some(10)), + "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false, + defaultValue = Some(4)), + "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false, + defaultValue = Some(1000)), + "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false, + defaultValue = Some(10)), + "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false, + defaultValue = Some(60))) + + def application(config: ParseResult): StreamApplication = { + val sourceNum = config.getInt("source") + val inspectorNum = config.getInt("inspector") + val vehicleNum = config.getInt("vehicle") + val citysize = config.getInt("citysize") + val threshold = config.getInt("threshold") + val source = Processor[DataSource](sourceNum) + val inspector = Processor[VelocityInspector](inspectorNum) + val queryServer = Processor[QueryServer](1) + val partitioner = new HashPartitioner + + val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). + withInt(DataSource.MOCK_CITY_SIZE, citysize). + withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold). + withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) + StreamApplication("transport", Graph(source ~ partitioner ~> inspector, + Node(queryServer)), userConfig) + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + implicit val system = context.system + context.submit(application(config)) + context.close() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala new file mode 100644 index 0000000..4d9bd04 --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import java.util.concurrent.TimeUnit +import scala.collection.immutable.Queue +import scala.collection.mutable +import scala.concurrent.Future + +import akka.actor.Actor._ +import akka.actor.ActorRef +import akka.pattern.ask + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.examples.transport.generator.MockCity +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph + +class VelocityInspector(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import system.dispatcher + import taskContext.appMaster + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + private val passRecords = mutable.Map.empty[String, Queue[PassRecord]] + private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get + private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get + private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get + private val mockCity = new MockCity(citySize) + private var queryServerActor: ActorRef = null + + override def onStart(startTime: StartTime): Unit = { + val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( + StreamApplication.DAG).get) + val queryServer = dag.processors.find { kv => + val (_, processor) = kv + processor.taskClass == classOf[QueryServer].getName + }.get + val queryServerTaskId = TaskId(queryServer._1, 0) + (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]] + .map { task => + queryServerActor = task.task + } + } + + import org.apache.gearpump.streaming.examples.transport.VelocityInspector._ + override def onNext(msg: Message): Unit = { + msg.msg match { + case passRecord: PassRecord => + val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord]) + if (records.size > 0) { + val velocity = getVelocity(passRecord, records.last) + val formatted = "%.2f".format(velocity) + if (velocity > overdriveThreshold) { + if (velocity > fakePlateThreshold) { + LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " + + s"the speed is $formatted km/h") + } + if (queryServerActor != null) { + queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, + passRecord.timeStamp, passRecord.locationId) + } + } + } + passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM)) + } + } + + override def receiveUnManagedMessage: Receive = { + case GetTrace(vehicleId) => + val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord]) + sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp)) + } + + private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = { + val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId) + val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60) + distanceInKm / timeInHour + } + + private def getDistance(location1: String, location2: String): Long = { + mockCity.getDistance(location1, location2) + } +} + +object VelocityInspector { + final val OVER_DRIVE_THRESHOLD = "overdrive.threshold" + final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold" + final val RECORDS_NUM = 20 + + class FiniteQueue[T](q: Queue[T]) { + def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = { + var result = q.enqueue(elem) + while (result.size > maxSize) { + result = result.dequeue._2 + } + result + } + } + + import scala.language.implicitConversions + + implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala new file mode 100644 index 0000000..60e0bcf --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport.generator + +import scala.util.Random + +import org.apache.gearpump.streaming.examples.transport.generator.MockCity._ + +class MockCity(size: Int) { + private val random = new Random() + private val directions = Array(UP, DOWN, LEFT, RIGHT) + + def nextLocation(currentLocationId: String): String = { + val coordinate = idToCoordinate(currentLocationId) + val direction = directions(random.nextInt(4)) + val newCoordinate = coordinate.addOffset(direction) + if (inCity(newCoordinate)) { + coordinateToId(newCoordinate) + } else { + nextLocation(currentLocationId) + } + } + + def getDistance(locationId1: String, locationId2: String): Long = { + val coordinate1 = idToCoordinate(locationId1) + val coordinate2 = idToCoordinate(locationId2) + val blocks = Math.abs(coordinate1.row - coordinate2.row) + + Math.abs(coordinate1.column - coordinate2.column) + blocks * LENGTH_PER_BLOCK + } + + def randomLocationId(): String = { + val row = random.nextInt(size) + val column = random.nextInt(size) + coordinateToId(Coordinate(row, column)) + } + + private def coordinateToId(coordinate: Coordinate): String = { + s"Id_${coordinate.row}_${coordinate.column}" + } + + private def idToCoordinate(locationId: String): Coordinate = { + val attr = locationId.split("_") + val row = attr(1).toInt + val column = attr(2).toInt + Coordinate(row, column) + } + + private def inCity(coordinate: Coordinate): Boolean = { + coordinate.row >= 0 && + coordinate.row < size && + coordinate.column >= 0 && + coordinate.column < size + } +} + +object MockCity { + // The length of the mock city, km + final val LENGTH_PER_BLOCK = 5 + // The minimal speed, km/h + final val MINIMAL_SPEED = 10 + + final val UP = Coordinate(0, 1) + final val DOWN = Coordinate(0, -1) + final val LEFT = Coordinate(-1, 0) + final val RIGHT = Coordinate(1, 0) + + case class Coordinate(row: Int, column: Int) { + def addOffset(coordinate: Coordinate): Coordinate = { + Coordinate(this.row + coordinate.row, this.column + coordinate.column) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala new file mode 100644 index 0000000..e8c1c59 --- /dev/null +++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport.generator + +import scala.util.Random + +import org.apache.gearpump.streaming.examples.transport.PassRecord +import org.apache.gearpump.util.LogUtil + +class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) { + private val LOG = LogUtil.getLogger(getClass) + LOG.info(s"Generate pass record for vehicle $vehicleId") + private var timeStamp = System.currentTimeMillis() + + private var locationId = city.randomLocationId() + private val random = new Random() + private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE + private val (randomMin, randomRange) = { + val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat + val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat + val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE + val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES) + val randomRange = upperBound - randomMin + (randomMin.toInt, randomRange.toInt) + } + + def getNextPassRecord(): PassRecord = { + locationId = if (fakePlate) { + city.randomLocationId() + } else { + city.nextLocation(locationId) + } + timeStamp += (random.nextInt(randomRange) + randomMin) + PassRecord(vehicleId, locationId, timeStamp) + } +} + +object PassRecordGenerator { + final val FAKE_PLATE_RATE = 0.01F + final val OVERDRIVE_RATE = 0.05F + final val TWOMINUTES = 2 * 60 * 1000 + + def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int) + : Array[PassRecordGenerator] = { + var result = Map.empty[String, PassRecordGenerator] + val digitsNum = (Math.log10(generatorNum) + 1).toInt + for (i <- 1 to generatorNum) { + val vehicleId = prefix + s"%0${digitsNum}d".format(i) + val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold) + result += vehicleId -> generator + } + result.values.toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala deleted file mode 100644 index 75f9d60..0000000 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class DataSourceSpec extends FlatSpec with Matchers { - it should "create the pass record" in { - val vehicleNum = 2 - val context = MockUtil.mockTaskContext - - val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). - withInt(DataSource.MOCK_CITY_SIZE, 10). - withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60). - withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) - - val source = new DataSource(context, userConfig) - source.onStart(StartTime(0)) - source.onNext(Message("start")) - verify(context, times(vehicleNum)).output(any[Message]) - source.onStop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala deleted file mode 100644 index b61fd43..0000000 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class TransportSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { - - override def beforeAll { - startActorSystem() - } - - override def afterAll { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("Transport should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-source", "1", - "-inspector", "1", - "-vehicle", "100", - "-citysize", "10", - "-threshold", "60") - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - Transport.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala deleted file mode 100644 index e91d91c..0000000 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class MockCitySpec extends PropSpec with PropertyChecks with Matchers { - - property("MockCity should maintain the location properly") { - val city = new MockCity(10) - val start = city.randomLocationId() - val nextLocation = city.nextLocation(start) - assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala deleted file mode 100644 index 1c1901e..0000000 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers { - - property("PassRecordGenerator should generate pass record") { - val id = "test" - val city = new MockCity(10) - val generator = new PassRecordGenerator(id, city, 60) - val passrecord1 = generator.getNextPassRecord() - val passrecord2 = generator.getNextPassRecord() - assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == - MockCity.LENGTH_PER_BLOCK) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala new file mode 100644 index 0000000..1f525ae --- /dev/null +++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class DataSourceSpec extends FlatSpec with Matchers { + it should "create the pass record" in { + val vehicleNum = 2 + val context = MockUtil.mockTaskContext + + val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). + withInt(DataSource.MOCK_CITY_SIZE, 10). + withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60). + withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) + + val source = new DataSource(context, userConfig) + source.onStart(StartTime(0)) + source.onNext(Message("start")) + verify(context, times(vehicleNum)).output(any[Message]) + source.onStop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala new file mode 100644 index 0000000..2f83de5 --- /dev/null +++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport + +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class TransportSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { + + override def beforeAll { + startActorSystem() + } + + override def afterAll { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("Transport should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + val optionalArgs = Array( + "-source", "1", + "-inspector", "1", + "-vehicle", "100", + "-citysize", "10", + "-threshold", "60") + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + + Future { + Transport.main(masterConfig, args) + } + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala new file mode 100644 index 0000000..ba4eb2d --- /dev/null +++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport.generator + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class MockCitySpec extends PropSpec with PropertyChecks with Matchers { + + property("MockCity should maintain the location properly") { + val city = new MockCity(10) + val start = city.randomLocationId() + val nextLocation = city.nextLocation(start) + assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala new file mode 100644 index 0000000..f0eebbf --- /dev/null +++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.transport.generator + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers { + + property("PassRecordGenerator should generate pass record") { + val id = "test" + val city = new MockCity(10) + val generator = new PassRecordGenerator(id, city, 60) + val passrecord1 = generator.getNextPassRecord() + val passrecord2 = generator.getNextPassRecord() + assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == + MockCity.LENGTH_PER_BLOCK) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java deleted file mode 100644 index 720e179..0000000 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcountjava; - -import io.gearpump.Message; -import io.gearpump.cluster.UserConfig; -import io.gearpump.streaming.javaapi.Task; -import io.gearpump.streaming.task.StartTime; -import io.gearpump.streaming.task.TaskContext; - -public class Split extends Task { - - public static String TEXT = "This is a good start for java! bingo! bingo! "; - - public Split(TaskContext taskContext, UserConfig userConf) { - super(taskContext, userConf); - } - - private Long now() { - return System.currentTimeMillis(); - } - - @Override - public void onStart(StartTime startTime) { - self().tell(new Message("start", now()), self()); - } - - @Override - public void onNext(Message msg) { - - // Split the TEXT to words - String[] words = TEXT.split(" "); - for (int i = 0; i < words.length; i++) { - context.output(new Message(words[i], now())); - } - self().tell(new Message("next", now()), self()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java deleted file mode 100644 index 28cf8cb..0000000 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcountjava; - -import io.gearpump.Message; -import io.gearpump.cluster.UserConfig; -import io.gearpump.streaming.javaapi.Task; -import io.gearpump.streaming.task.StartTime; -import io.gearpump.streaming.task.TaskContext; -import org.slf4j.Logger; - -import java.util.HashMap; - -public class Sum extends Task { - - private Logger LOG = super.LOG(); - private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); - - public Sum(TaskContext taskContext, UserConfig userConf) { - super(taskContext, userConf); - } - - @Override - public void onStart(StartTime startTime) { - //skip - } - - @Override - public void onNext(Message messagePayLoad) { - String word = (String) (messagePayLoad.msg()); - Integer current = wordCount.get(word); - if (current == null) { - current = 0; - } - Integer newCount = current + 1; - wordCount.put(word, newCount); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java deleted file mode 100644 index 40054d3..0000000 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcountjava; - -import com.typesafe.config.Config; -import io.gearpump.cluster.ClusterConfig; -import io.gearpump.cluster.UserConfig; -import io.gearpump.cluster.client.ClientContext; -import io.gearpump.cluster.embedded.EmbeddedCluster; -import io.gearpump.partitioner.HashPartitioner; -import io.gearpump.partitioner.Partitioner; -import io.gearpump.streaming.javaapi.Graph; -import io.gearpump.streaming.javaapi.Processor; -import io.gearpump.streaming.javaapi.StreamApplication; - -/** Java version of WordCount with Processor Graph API */ -public class WordCount { - - public static void main(String[] args) throws InterruptedException { - main(ClusterConfig.defaultConfig(), args); - } - - public static void main(Config akkaConf, String[] args) throws InterruptedException { - - // For split task, we config to create two tasks - int splitTaskNumber = 2; - Processor split = new Processor(Split.class).withParallelism(splitTaskNumber); - - // For sum task, we have two summer. - int sumTaskNumber = 2; - Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber); - - // construct the graph - Graph graph = new Graph(); - graph.addVertex(split); - graph.addVertex(sum); - - Partitioner partitioner = new HashPartitioner(); - graph.addEdge(split, partitioner, sum); - - UserConfig conf = UserConfig.empty(); - StreamApplication app = new StreamApplication("wordcountJava", conf, graph); - - EmbeddedCluster localCluster = null; - - Boolean debugMode = System.getProperty("DEBUG") != null; - - if (debugMode) { - localCluster = new EmbeddedCluster(akkaConf); - localCluster.start(); - } - - ClientContext masterClient = null; - - if (localCluster != null) { - masterClient = localCluster.newClientContext(); - } else { - // create master client - // It will read the master settings under gearpump.cluster.masters - masterClient = new ClientContext(akkaConf); - } - - masterClient.submit(app); - - if (debugMode) { - Thread.sleep(30 * 1000); // sleep for 30 seconds. - } - - masterClient.close(); - - if (localCluster != null) { - localCluster.stop(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java deleted file mode 100644 index 3aefd7f..0000000 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcountjava.dsl; - -import com.typesafe.config.Config; -import io.gearpump.cluster.ClusterConfig; -import io.gearpump.cluster.UserConfig; -import io.gearpump.cluster.client.ClientContext; -import io.gearpump.google.common.collect.Lists; -import io.gearpump.streaming.dsl.javaapi.JavaStream; -import io.gearpump.streaming.dsl.javaapi.JavaStreamApp; -import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; -import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; -import io.gearpump.streaming.javaapi.dsl.functions.MapFunction; -import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; -import scala.Tuple2; - -import java.util.Iterator; -import java.util.List; - -/** Java version of WordCount with high level DSL API */ -public class WordCount { - - public static void main(String[] args) throws InterruptedException { - main(ClusterConfig.defaultConfig(), args); - } - - public static void main(Config akkaConf, String[] args) throws InterruptedException { - ClientContext context = new ClientContext(akkaConf); - JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); - List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!"); - - JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source"); - - JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> apply(String s) { - return Lists.newArrayList(s.split("\\s+")).iterator(); - } - }, "flatMap"); - - JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> apply(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }, "map"); - - JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() { - @Override - public String apply(Tuple2<String, Integer> tuple) { - return tuple._1(); - } - }, 1, "groupBy"); - - JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { - return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); - } - }, "reduce"); - - wordcount.log(); - - app.run(); - context.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java new file mode 100644 index 0000000..76069c1 --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava; + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.javaapi.Task; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.TaskContext; + +public class Split extends Task { + + public static String TEXT = "This is a good start for java! bingo! bingo! "; + + public Split(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + self().tell(new Message("start", now()), self()); + } + + @Override + public void onNext(Message msg) { + + // Split the TEXT to words + String[] words = TEXT.split(" "); + for (int i = 0; i < words.length; i++) { + context.output(new Message(words[i], now())); + } + self().tell(new Message("next", now()), self()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java new file mode 100644 index 0000000..89c3b14 --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava; + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.javaapi.Task; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; + +import java.util.HashMap; + +public class Sum extends Task { + + private Logger LOG = super.LOG(); + private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); + + public Sum(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + } + + @Override + public void onStart(StartTime startTime) { + //skip + } + + @Override + public void onNext(Message messagePayLoad) { + String word = (String) (messagePayLoad.msg()); + Integer current = wordCount.get(word); + if (current == null) { + current = 0; + } + Integer newCount = current + 1; + wordCount.put(word, newCount); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java new file mode 100644 index 0000000..ee44536 --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava; + +import com.typesafe.config.Config; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.apache.gearpump.partitioner.HashPartitioner; +import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.javaapi.Graph; +import org.apache.gearpump.streaming.javaapi.Processor; +import org.apache.gearpump.streaming.javaapi.StreamApplication; + +/** Java version of WordCount with Processor Graph API */ +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + + // For split task, we config to create two tasks + int splitTaskNumber = 2; + Processor split = new Processor(Split.class).withParallelism(splitTaskNumber); + + // For sum task, we have two summer. + int sumTaskNumber = 2; + Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber); + + // construct the graph + Graph graph = new Graph(); + graph.addVertex(split); + graph.addVertex(sum); + + Partitioner partitioner = new HashPartitioner(); + graph.addEdge(split, partitioner, sum); + + UserConfig conf = UserConfig.empty(); + StreamApplication app = new StreamApplication("wordcountJava", conf, graph); + + EmbeddedCluster localCluster = null; + + Boolean debugMode = System.getProperty("DEBUG") != null; + + if (debugMode) { + localCluster = new EmbeddedCluster(akkaConf); + localCluster.start(); + } + + ClientContext masterClient = null; + + if (localCluster != null) { + masterClient = localCluster.newClientContext(); + } else { + // create master client + // It will read the master settings under gearpump.cluster.masters + masterClient = new ClientContext(akkaConf); + } + + masterClient.submit(app); + + if (debugMode) { + Thread.sleep(30 * 1000); // sleep for 30 seconds. + } + + masterClient.close(); + + if (localCluster != null) { + localCluster.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java new file mode 100644 index 0000000..0ecc42e --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava.dsl; + +import com.typesafe.config.Config; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** Java version of WordCount with high level DSL API */ +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + ClientContext context = new ClientContext(akkaConf); + JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); + List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!")); + + JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source"); + + JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> apply(String s) { + return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator(); + } + }, "flatMap"); + + JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }, "map"); + + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() { + @Override + public String apply(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + }, 1, "groupBy"); + + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); + } + }, "reduce"); + + wordcount.log(); + + app.run(); + context.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala deleted file mode 100644 index 1a1d019..0000000 --- a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.streaming.examples.wordcountjava.WordCount - -class WordCountSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("WordCount should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - - val masterReceiver = createMockMaster() - - val args = requiredArgs - - Future { - WordCount.main(masterConfig, args) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala new file mode 100644 index 0000000..3736c86 --- /dev/null +++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava + +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.streaming.examples.wordcountjava.WordCount + +class WordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("WordCount should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + + val masterReceiver = createMockMaster() + + val args = requiredArgs + + Future { + WordCount.main(masterConfig, args) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala deleted file mode 100644 index 387bc75..0000000 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount - -import java.util.concurrent.TimeUnit - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output - - override def onStart(startTime: StartTime): Unit = { - self ! Message("start") - } - - override def onNext(msg: Message): Unit = { - Split.TEXT_TO_SPLIT.lines.foreach { line => - line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => - output(new Message(msg, System.currentTimeMillis())) - } - } - - import scala.concurrent.duration._ - taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! - Message("continue", System.currentTimeMillis())) - } -} - -object Split { - val TEXT_TO_SPLIT = - """ - | Licensed to the Apache Software Foundation (ASF) under one - | or more contributor license agreements. See the NOTICE file - | distributed with this work for additional information - | regarding copyright ownership. The ASF licenses this file - | to you under the Apache License, Version 2.0 (the - | "License"); you may not use this file except in compliance - | with the License. You may obtain a copy of the License at - | - | http://www.apache.org/licenses/LICENSE-2.0 - | - | Unless required by applicable law or agreed to in writing, software - | distributed under the License is distributed on an "AS IS" BASIS, - | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - | See the License for the specific language governing permissions and - | limitations under the License. - """.stripMargin -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala deleted file mode 100644 index 6560066..0000000 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount - -import java.util.concurrent.TimeUnit -import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration - -import akka.actor.Cancellable - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() - - private[wordcount] var wordCount: Long = 0 - private var snapShotTime: Long = System.currentTimeMillis() - private var snapShotWordCount: Long = 0 - - private var scheduler: Cancellable = null - - override def onStart(startTime: StartTime): Unit = { - scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), - new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount) - } - - override def onNext(msg: Message): Unit = { - if (null != msg) { - val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) - wordCount += 1 - map.put(msg.msg.asInstanceOf[String], current + 1) - } - } - - override def onStop(): Unit = { - if (scheduler != null) { - scheduler.cancel() - } - } - - def reportWordCount(): Unit = { - val current: Long = System.currentTimeMillis() - LOG.info(s"Task ${taskContext.taskId} Throughput:" + - s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") - snapShotWordCount = wordCount - snapShotTime = current - } -} \ No newline at end of file
