http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala index 91ba36e..638dc4e 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,51 +16,60 @@ * limitations under the License. */ - package io.gearpump.streaming.examples.stock.main import akka.actor.ActorSystem -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.examples.stock.{Crawler, QueryServer, Analyzer, StockMarket} +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner -import StockMarket.ServiceHour +import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour +import io.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket} +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.transport.HostPort import io.gearpump.util.Graph.Node import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger +/** Tracks the China's stock market index change */ object Stock extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "crawler"-> CLIOption[Int]("<how many fetcher to get data from remote>", required = false, defaultValue = Some(10)), - "analyzer"-> CLIOption[Int]("<parallism of analyzer>", required = false, defaultValue = Some(1)), - "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", required = false, defaultValue = Some(""))) + "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>", + required = false, defaultValue = Some(10)), + "analyzer" -> CLIOption[Int]("<parallism of analyzer>", + required = false, defaultValue = Some(1)), + "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", + required = false, defaultValue = Some(""))) - def crawler(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = { + def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { val crawler = Processor[Crawler](config.getInt("crawler")) val analyzer = Processor[Analyzer](config.getInt("analyzer")) val queryServer = Processor[QueryServer](1) val proxySetting = config.getString("proxy") - val proxy = if (proxySetting.isEmpty) {null } else HostPort(proxySetting) + val proxy = if (proxySetting.isEmpty) { + null + } else HostPort(proxySetting) val stockMarket = new StockMarket(new ServiceHour(true), proxy) val stocks = stockMarket.getStockIdList + // scalastyle:off println Console.println(s"Successfully fetched stock id for ${stocks.length} stocks") + // scalastyle:on println - val userConfig = UserConfig.empty.withValue("StockId", stocks).withValue[StockMarket](classOf[StockMarket].getName, stockMarket) + val userConfig = UserConfig.empty.withValue("StockId", stocks) + .withValue[StockMarket](classOf[StockMarket].getName, stockMarket) val partitioner = new HashPartitioner val p1 = crawler ~ partitioner ~> analyzer val p2 = Node(queryServer) val graph = Graph(p1, p2) val app = StreamApplication("stock_direct_analyzer", graph, userConfig - ) + ) app }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md new file mode 100644 index 0000000..fc9bdfe --- /dev/null +++ b/examples/streaming/transport/README.md @@ -0,0 +1,3 @@ +What is this? +============= +A smart transportation example which simulate a city with millions of cars. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/css/custom.css ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css index 5b19ac7..f324b6a 100644 --- a/examples/streaming/transport/src/main/resources/transport/css/custom.css +++ b/examples/streaming/transport/src/main/resources/transport/css/custom.css @@ -1,16 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + .ui-datepicker { font-size: 11px; - } +} + .sidebar-label { - font-size:15px; + font-size: 15px; font-family: calibri, Arial, Helvetica, sans-serif; } - + .help { - font-size:12px; + font-size: 12px; font-family: calibri, Arial, Helvetica, sans-serif; } - + div.splitter { margin: 12px 0px 7px 0px; clear: both; @@ -18,13 +37,13 @@ div.splitter { } input.sidebar { - width:165px + width: 165px } select.sidebar { - width:198px + width: 198px } - + table.dataintable { font-family: calibri, Arial, Helvetica, sans-serif; font-size: 15px; @@ -48,49 +67,49 @@ table.dataintable td { border: 1px solid #AAA; } -#search{ - width:100px; - height:25px; - position:relative; - left:0px; - top:5px; +#search { + width: 100px; + height: 25px; + position: relative; + left: 0px; + top: 5px; } -#mytable{ - width:100%; - height:300; - float:left; +#mytable { + width: 100%; + height: 300; + float: left; } -#mychart{ - height:400px; - width:100%; +#mychart { + height: 400px; + width: 100%; } -#Menu{ - height:100%; - width:245px; - float:left; +#Menu { + height: 100%; + width: 245px; + float: left; } -#header{ - height:115px; +#header { + height: 115px; background-image: url(header.png); } -#body{ - height:100%; - width:100%; +#body { + height: 100%; + width: 100%; background-image: url(body.png); - background-size:100% 100%; + background-size: 100% 100%; } -#footer{ - color:white; - height:70px; - line-height:70px; - text-align:middle; - clear:both; - text-align:center; +#footer { + color: white; + height: 70px; + line-height: 70px; + text-align: middle; + clear: both; + text-align: center; background-image: url(foot.png); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/js/transport.js ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js index 35d9ea2..eef0fe9 100644 --- a/examples/streaming/transport/src/main/resources/transport/js/transport.js +++ b/examples/streaming/transport/src/main/resources/transport/js/transport.js @@ -1,162 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + var myChart = echarts.init(document.getElementById("mychart")) echarts.util.mapData.params.params.football = { - getGeoJson: function (callback) { - $.ajax({ - url: "../svg/beijing.svg", - dataType: 'xml', - success: function(xml) { - callback(xml) - } - }); - } + getGeoJson: function (callback) { + $.ajax({ + url: "../svg/beijing.svg", + dataType: 'xml', + success: function (xml) { + callback(xml) + } + }); + } } function updateRecords(tableId) { - $.getJSON( "records", function( json ) { - var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">"; - tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>"; - var records = json.records; - for(var i = 0; i < Math.min(records.length, 20); i++) { - var record = records[i]; - var vehicleId = record.vehicleId; - var location = record.locationId.split("_"); - var speed = record.speed; - var row = location[1]; - var column = location[2]; - var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/,''); - tableStr += "<tr><td>" + vehicleId + "</td>"; - tableStr += "<td>" + speed + "km/h </td>" - tableStr += "<td>(" + row + ", "+ column + ")</td>"; - tableStr += "<td>" + time + "</td></tr>"; - } - if(records.length < 20) { - for(var i = records.length; i < 20; i++) { + $.getJSON("records", function (json) { + var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">"; + tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>"; + var records = json.records; + for (var i = 0; i < Math.min(records.length, 20); i++) { + var record = records[i]; + var vehicleId = record.vehicleId; + var location = record.locationId.split("_"); + var speed = record.speed; + var row = location[1]; + var column = location[2]; + var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, ''); + tableStr += "<tr><td>" + vehicleId + "</td>"; + tableStr += "<td>" + speed + "km/h </td>" + tableStr += "<td>(" + row + ", " + column + ")</td>"; + tableStr += "<td>" + time + "</td></tr>"; + } + if (records.length < 20) { + for (var i = records.length; i < 20; i++) { tableStr += "<tr><td></td>"; tableStr += "<td> </td>" tableStr += "<td> </td>"; tableStr += "<td> </td></tr>"; - } - } - tableStr += "</table>" - document.getElementById(tableId).innerHTML = tableStr; - } - ) + } + } + tableStr += "</table>" + document.getElementById(tableId).innerHTML = tableStr; + } + ) } function initChart(chartid, vehicleId) { - // åºäºåå¤å¥½çdomï¼åå§åechartså¾è¡¨ - $.getJSON( "trace/" + vehicleId, function( json ) { - // 为echarts对象å è½½æ°æ® - var records = json.records; - var timeLine = new Array(records.length); - var markPoints = new Array(records.length); - var options_ = new Array(records.length - 2); - for(var i = 0; i < records.length; i++) { - var record = records[i]; - var vehicleId = record.vehicleId; - var location = record.locationId.split("_"); - var row = location[1]; - var column = location[2]; - var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/,''); - timeLine[i] = time; - var currentPonit = {name: "", value: i, geoCoord:[row * 90, column * 90]}; - markPoints[i] = currentPonit; + // åºäºåå¤å¥½çdomï¼åå§åechartså¾è¡¨ + $.getJSON("trace/" + vehicleId, function (json) { + // 为echarts对象å è½½æ°æ® + var records = json.records; + var timeLine = new Array(records.length); + var markPoints = new Array(records.length); + var options_ = new Array(records.length - 2); + for (var i = 0; i < records.length; i++) { + var record = records[i]; + var vehicleId = record.vehicleId; + var location = record.locationId.split("_"); + var row = location[1]; + var column = location[2]; + var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, ''); + timeLine[i] = time; + var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]}; + markPoints[i] = currentPonit; + } + options_[0] = + { + title: { + text: 'Vehicle trace' + }, + tooltip: { + trigger: 'item' + }, + toolbox: { + show: false, + feature: { + mark: {show: true}, + dataView: {show: true, readOnly: false}, + magicType: {show: true, type: ['line', 'bar']}, + restore: {show: true}, + saveAsImage: {show: true} } - options_[0] = - { - title : { - text: 'Vehicle trace' - }, - tooltip : { - trigger: 'item' - }, - toolbox: { - show : false, - feature : { - mark : {show: true}, - dataView : {show: true, readOnly: false}, - magicType : {show: true, type: ['line', 'bar']}, - restore : {show: true}, - saveAsImage : {show: true} + }, + series: [ + { + name: 'Vehicle trace', + type: 'map', + mapType: 'football', + mapLocation: { + y: 30, + height: 430 + }, + itemStyle: { + normal: {label: {show: false}}, + emphasis: {label: {show: false}} + }, + data: [ + {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}} + ], + markPoint: { + symbol: 'circle', + symbolSize: 8, + itemStyle: { + normal: { + borderWidth: 1, + color: 'blue', + lineStyle: { + type: 'solid' + } } }, - series : [ - { - name: 'Vehicle trace', - type: 'map', - mapType: 'football', - mapLocation:{ - y: 30, - height: 430 - }, - itemStyle:{ - normal:{label:{show:false}}, - emphasis:{label:{show:false}} - }, - data:[ - {name: 'City', hoverable: false, itemStyle:{normal:{label:{show:false}}}} - ], - markPoint : { - symbol:'circle', - symbolSize : 8, - itemStyle : { - normal: { - borderWidth:1, - color: 'blue', - lineStyle: { - type: 'solid' - } - } - }, - data: [markPoints[0]] - }, - markLine : { - smooth:true, - effect : { - show: true, - scaleSize: 1.5, - period: 1.5, - color: '#fff' - }, - itemStyle : { - normal: { - borderWidth:2, - color: 'red', - lineStyle: { - type: 'solid' - } - } - }, - data: [] + data: [markPoints[0]] + }, + markLine: { + smooth: true, + effect: { + show: true, + scaleSize: 1.5, + period: 1.5, + color: '#fff' + }, + itemStyle: { + normal: { + borderWidth: 2, + color: 'red', + lineStyle: { + type: 'solid' } } - ] + }, + data: [] } - for(var i = 1; i < markPoints.length; i++){ - options_[i] = + } + ] + } + for (var i = 1; i < markPoints.length; i++) { + options_[i] = + { + series: [ { - series: [ - { - markPoint : { - data: [markPoints[i]] - }, - markLine : { - data: [] - } + markPoint: { + data: [markPoints[i]] + }, + markLine: { + data: [] } - ] } - } - var option = { - timeline : { - type: 'number', - playInterval:500, - autoPlay:true, - data: timeLine - }, - options: options_ - }; - myChart.setOption(option); - }); + ] + } + } + var option = { + timeline: { + type: 'number', + playInterval: 500, + autoPlay: true, + data: timeLine + }, + options: options_ + }; + myChart.setOption(option); + }); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/transport.html ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html index fc17707..baee931 100644 --- a/examples/streaming/transport/src/main/resources/transport/transport.html +++ b/examples/streaming/transport/src/main/resources/transport/transport.html @@ -1,67 +1,88 @@ <!DOCTYPE html> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + <html> <head> - <meta charset="utf-8"> - <link rel=stylesheet type=text/css href="css/custom.css"> - <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script> - <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> + <meta charset="utf-8"> + <link rel=stylesheet type=text/css href="css/custom.css"> + <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script> + <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> </head> <body style="background-color:#F2F2F2"> <div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;"> - <div style="height:0px"></div> - <div id="header"> - <div style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> - Big Data Transport Monitoring Demo - </div> + <div style="height:0px"></div> + <div id="header"> + <div + style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> + Big Data Transport Monitoring Demo </div> - <div id="body"> - <div id="Menu"> - <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> - <!-- form to post to accompany to get accompanying cars --> + </div> + <div id="body"> + <div id="Menu"> + <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> + <!-- form to post to accompany to get accompanying cars --> - <table style="width:100%"> - <tr> - <td class="sidebar-label">Vehicle Id:</td> - </tr> - <tr> - <td class="sidebar-label"></td> - </tr> - <tr> - <td style="vertical-align:top;"> - <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/> - </td> - </tr> - </table> - <div class="splitter"></div> - <div> - <button id="search" onclick="search_onclick()">Search</button> - </div> - </div> + <table style="width:100%"> + <tr> + <td class="sidebar-label">Vehicle Id:</td> + </tr> + <tr> + <td class="sidebar-label"></td> + </tr> + <tr> + <td style="vertical-align:top;"> + <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/> + </td> + </tr> + </table> + <div class="splitter"></div> + <div> + <button id="search" onclick="search_onclick()">Search</button> </div> - <div id="content" style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> - <div style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> - Vehicle Trace: - </div> - <div style="height:7px;background-color:#92BDF2;"></div> + </div> + </div> + <div id="content" + style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> + <div + style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> + Vehicle Trace: + </div> + <div style="height:7px;background-color:#92BDF2;"></div> - <div id="mychart"></div> + <div id="mychart"></div> - <div id="mytable"></div> - </div> + <div id="mytable"></div> </div> - <div id="footer"> - Big Data Team @ Intel - </div> - <script src="js/transport.js"></script> - <script type="text/javascript"> - function search_onclick(){ - var vehicleId = document.getElementById('vehicleId').value - initChart("mychart", vehicleId) - } - setInterval(updateRecords, 1000, "mytable") - </script> + </div> + <div id="footer"> + Big Data Team @ Intel + </div> + <script src="js/transport.js"></script> + <script type="text/javascript"> + function search_onclick() { + var vehicleId = document.getElementById('vehicleId').value + initChart("mychart", vehicleId) + } + setInterval(updateRecords, 1000, "mytable") + </script> </div> </body> </html> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala index 7d3f35a..788f92a 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,9 +19,11 @@ package io.gearpump.streaming.examples.transport case class LocationInfo(id: String, row: Int, column: Int) +// scalastyle:off equals.hash.code case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) { override def hashCode: Int = vehicleId.hashCode } +// scalastyle:on equals.hash.code case class GetTrace(vehicleId: String) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala index 1c280f1..33d7b54 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala @@ -1,55 +1,56 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import scala.concurrent.duration._ - -class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){ - import taskContext.{output, parallelism, taskId, scheduleOnce} - - import system.dispatcher - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private val recordGenerators: Array[PassRecordGenerator] = - PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold) - - override def onStart(startTime: StartTime): Unit = { - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(msg: Message): Unit = { - recordGenerators.foreach(generator => - output(Message(generator.getNextPassRecord(), System.currentTimeMillis()))) - scheduleOnce(1 second)(self ! Message("continue", System.currentTimeMillis())) - } - - private def getIdentifier(taskId: TaskId): String = { - s"沪A${taskId.processorId}${taskId.index}" - } -} - -object DataSource { - final val VEHICLE_NUM = "vehicle.number" - final val MOCK_CITY_SIZE = "mock.city.size" -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport + +import scala.concurrent.duration._ + +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} +import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} + +class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.{output, parallelism, scheduleOnce, taskId} + private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get + private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism + private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get + private val mockCity = new MockCity(citySize) + private val recordGenerators: Array[PassRecordGenerator] = + PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold) + + override def onStart(startTime: StartTime): Unit = { + self ! Message("start", System.currentTimeMillis()) + } + + override def onNext(msg: Message): Unit = { + recordGenerators.foreach(generator => + output(Message(generator.getNextPassRecord(), System.currentTimeMillis()))) + scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis())) + } + + private def getIdentifier(taskId: TaskId): String = { + // scalastyle:off non.ascii.character.disallowed + s"沪A${taskId.processorId}${taskId.index}" + // scalastyle:on non.ascii.character.disallowed + } +} + +object DataSource { + final val VEHICLE_NUM = "vehicle.number" + final val MOCK_CITY_SIZE = "mock.city.size" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala index 5f91883..f9dbbde 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,30 +18,29 @@ package io.gearpump.streaming.examples.transport import java.util.concurrent.TimeUnit +import scala.concurrent.Future +import scala.util.{Failure, Success} import akka.actor.Actor._ -import akka.actor.{Actor, Props} +import akka.actor.{Actor, ActorRefFactory, Props} import akka.io.IO import akka.pattern.ask +import spray.can.Http +import spray.http.StatusCodes +import spray.json._ +import spray.routing.{HttpService, Route} +import upickle.default.write + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.appmaster.AppMaster import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer} import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication} import io.gearpump.util.Graph -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.HttpService -import upickle.default.write - -import scala.concurrent.Future -import scala.util.{Failure, Success} -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){ +class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import system.dispatcher import taskContext.appMaster @@ -50,7 +49,8 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC private var overSpeedRecords = List.empty[OverSpeedReport] override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get) + val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( + StreamApplication.DAG).get) inspector = dag.processors.find { kv => val (_, processor) = kv processor.taskClass == classOf[VelocityInspector].getName @@ -62,7 +62,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC } override def receiveUnManagedMessage: Receive = { - case getTrace @ GetTrace(vehicleId: String) => + case getTrace@GetTrace(vehicleId: String) => val parallism = inspector._2.parallelism val processorId = inspector._1 val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism) @@ -74,14 +74,14 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC LOG.info(s"reporting $trace") requester ! trace } - case record@ OverSpeedReport(vehicleId, speed, timestamp, locationId) => + case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) => LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h") overSpeedRecords :+= record case GetAllRecords => sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp)) overSpeedRecords = List.empty[OverSpeedReport] case _ => - //ignore + // Ignore } } @@ -94,21 +94,22 @@ object QueryServer { import context.dispatcher implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory = context + def actorRefFactory: ActorRefFactory = context implicit val system = context.system IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080) override def receive: Receive = runRoute(webServer ~ staticRoute) - def webServer = { - path("trace" / PathElement) { vehicleId => + def webServer: Route = { + path("trace" / Segment) { vehicleId => get { onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) { case Success(trace: VehicleTrace) => val json = write(trace) complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}") + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") } } } ~ @@ -118,7 +119,8 @@ object QueryServer { case Success(records: OverSpeedRecords) => val json = write(records) complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}") + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala index fb3b7c1..a795277 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala @@ -1,62 +1,69 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph} - -object Transport extends AkkaApp with ArgumentsParser { - override val options: Array[(String, CLIOption[Any])] = Array( - "source"-> CLIOption[Int]("<how many task to generate data>", required = false, defaultValue = Some(10)), - "inspector"-> CLIOption[Int]("<how many over speed inspector>", required = false, defaultValue = Some(4)), - "vehicle"-> CLIOption[Int]("<how many vehicles's to generate>", required = false, defaultValue = Some(1000)), - "citysize"-> CLIOption[Int]("<the blocks number of the mock city>", required = false, defaultValue = Some(10)), - "threshold"-> CLIOption[Int]("<overdrive threshold, km/h>", required = false, defaultValue = Some(60))) - - def application(config: ParseResult): StreamApplication = { - val sourceNum = config.getInt("source") - val inspectorNum = config.getInt("inspector") - val vehicleNum = config.getInt("vehicle") - val citysize = config.getInt("citysize") - val threshold = config.getInt("threshold") - val source = Processor[DataSource](sourceNum) - val inspector = Processor[VelocityInspector](inspectorNum) - val queryServer = Processor[QueryServer](1) - val partitioner = new HashPartitioner - - val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). - withInt(DataSource.MOCK_CITY_SIZE, citysize). - withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold). - withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) - StreamApplication("transport", Graph(source ~ partitioner ~> inspector, Node(queryServer)), userConfig) - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - implicit val system = context.system - context.submit(application(config)) - context.close() - } -} - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport + +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.{Processor, StreamApplication} +import io.gearpump.util.Graph._ +import io.gearpump.util.{AkkaApp, Graph} + +/** A city smart transportation streaming application */ +object Transport extends AkkaApp with ArgumentsParser { + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<how many task to generate data>", required = false, + defaultValue = Some(10)), + "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false, + defaultValue = Some(4)), + "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false, + defaultValue = Some(1000)), + "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false, + defaultValue = Some(10)), + "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false, + defaultValue = Some(60))) + + def application(config: ParseResult): StreamApplication = { + val sourceNum = config.getInt("source") + val inspectorNum = config.getInt("inspector") + val vehicleNum = config.getInt("vehicle") + val citysize = config.getInt("citysize") + val threshold = config.getInt("threshold") + val source = Processor[DataSource](sourceNum) + val inspector = Processor[VelocityInspector](inspectorNum) + val queryServer = Processor[QueryServer](1) + val partitioner = new HashPartitioner + + val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). + withInt(DataSource.MOCK_CITY_SIZE, citysize). + withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold). + withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) + StreamApplication("transport", Graph(source ~ partitioner ~> inspector, + Node(queryServer)), userConfig) + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + implicit val system = context.system + context.submit(application(config)) + context.close() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala index 5265d45..b9be8d7 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala @@ -1,116 +1,123 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import java.util.concurrent.TimeUnit - -import akka.actor.Actor._ -import akka.actor.ActorRef -import akka.pattern.ask -import io.gearpump.streaming.{StreamApplication, ProcessorDescription, DAG} -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.streaming.examples.transport.generator.MockCity -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.PartitionerDescription -import AppMaster.{TaskActorRef, LookupTaskActorRef} -import io.gearpump.util.Graph - -import scala.collection.immutable.Queue -import scala.collection.mutable -import scala.concurrent.Future - -class VelocityInspector(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.appMaster - import system.dispatcher - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - private val passRecords = mutable.Map.empty[String, Queue[PassRecord]] - private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private var queryServerActor: ActorRef = null - - override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get) - val queryServer = dag.processors.find { kv => - val (_, processor) = kv - processor.taskClass == classOf[QueryServer].getName - }.get - val queryServerTaskId = TaskId(queryServer._1, 0) - (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]].map {task => - queryServerActor = task.task - } - } - - import VelocityInspector._ - override def onNext(msg: Message): Unit = { - msg.msg match { - case passRecord: PassRecord => - val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord]) - if(records.size > 0) { - val velocity = getVelocity(passRecord, records.last) - val formatted = "%.2f".format(velocity) - if(velocity > overdriveThreshold) { - if(velocity > fakePlateThreshold) { - LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, the speed is $formatted km/h") - } - if(queryServerActor != null) { - queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, passRecord.timeStamp, passRecord.locationId) - } - } - } - passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM)) - } - } - - override def receiveUnManagedMessage: Receive = { - case GetTrace(vehicleId) => - val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord]) - sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp)) - } - - private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = { - val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId) - val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60) - distanceInKm / timeInHour - } - - private def getDistance(location1: String, location2: String): Long = { - mockCity.getDistance(location1, location2) - } -} - -object VelocityInspector{ - final val OVER_DRIVE_THRESHOLD = "overdrive.threshold" - final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold" - final val RECORDS_NUM = 20 - - class FiniteQueue[T](q: Queue[T]) { - def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = { - var result = q.enqueue(elem) - while (result.size > maxSize) { - result = result.dequeue._2 - } - result - } - } - - implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q) +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport + +import java.util.concurrent.TimeUnit +import scala.collection.immutable.Queue +import scala.collection.mutable +import scala.concurrent.Future + +import akka.actor.Actor._ +import akka.actor.ActorRef +import akka.pattern.ask + +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.partitioner.PartitionerDescription +import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import io.gearpump.streaming.examples.transport.generator.MockCity +import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import io.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication} +import io.gearpump.util.Graph + +class VelocityInspector(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import system.dispatcher + import taskContext.appMaster + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + private val passRecords = mutable.Map.empty[String, Queue[PassRecord]] + private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get + private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get + private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get + private val mockCity = new MockCity(citySize) + private var queryServerActor: ActorRef = null + + override def onStart(startTime: StartTime): Unit = { + val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( + StreamApplication.DAG).get) + val queryServer = dag.processors.find { kv => + val (_, processor) = kv + processor.taskClass == classOf[QueryServer].getName + }.get + val queryServerTaskId = TaskId(queryServer._1, 0) + (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]] + .map { task => + queryServerActor = task.task + } + } + + import io.gearpump.streaming.examples.transport.VelocityInspector._ + override def onNext(msg: Message): Unit = { + msg.msg match { + case passRecord: PassRecord => + val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord]) + if (records.size > 0) { + val velocity = getVelocity(passRecord, records.last) + val formatted = "%.2f".format(velocity) + if (velocity > overdriveThreshold) { + if (velocity > fakePlateThreshold) { + LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " + + s"the speed is $formatted km/h") + } + if (queryServerActor != null) { + queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, + passRecord.timeStamp, passRecord.locationId) + } + } + } + passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM)) + } + } + + override def receiveUnManagedMessage: Receive = { + case GetTrace(vehicleId) => + val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord]) + sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp)) + } + + private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = { + val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId) + val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60) + distanceInKm / timeInHour + } + + private def getDistance(location1: String, location2: String): Long = { + mockCity.getDistance(location1, location2) + } +} + +object VelocityInspector { + final val OVER_DRIVE_THRESHOLD = "overdrive.threshold" + final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold" + final val RECORDS_NUM = 20 + + class FiniteQueue[T](q: Queue[T]) { + def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = { + var result = q.enqueue(elem) + while (result.size > maxSize) { + result = result.dequeue._2 + } + result + } + } + + import scala.language.implicitConversions + + implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala index d2786ef..ff78679 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala @@ -1,86 +1,88 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import scala.util.Random -import MockCity._ - -class MockCity(size: Int) { - private val random = new Random() - private val directions = Array(UP, DOWN, LEFT, RIGHT) - - def nextLocation(currentLocationId: String): String = { - val coordinate = idToCoordinate(currentLocationId) - val direction = directions(random.nextInt(4)) - val newCoordinate = coordinate.addOffset(direction) - if(inCity(newCoordinate)) { - coordinateToId(newCoordinate) - } else { - nextLocation(currentLocationId) - } - } - - def getDistance(locationId1: String, locationId2: String): Long = { - val coordinate1 = idToCoordinate(locationId1) - val coordinate2 = idToCoordinate(locationId2) - val blocks = Math.abs(coordinate1.row - coordinate2.row) + Math.abs(coordinate1.column - coordinate2.column) - blocks * LENGTH_PER_BLOCK - } - - def randomLocationId(): String = { - val row = random.nextInt(size) - val column = random.nextInt(size) - coordinateToId(Coordinate(row, column)) - } - - private def coordinateToId(coordinate: Coordinate): String = { - s"Id_${coordinate.row}_${coordinate.column}" - } - - private def idToCoordinate(locationId: String): Coordinate = { - val attr = locationId.split("_") - val row = attr(1).toInt - val column =attr(2).toInt - Coordinate(row, column) - } - - private def inCity(coordinate: Coordinate): Boolean = { - coordinate.row >= 0 && - coordinate.row < size && - coordinate.column >= 0 && - coordinate.column < size - } -} - -object MockCity{ - //The length of the mock city, km - final val LENGTH_PER_BLOCK = 5 - //The minimal speed, km/h - final val MINIMAL_SPEED = 10 - - final val UP = Coordinate(0, 1) - final val DOWN = Coordinate(0, -1) - final val LEFT = Coordinate(-1, 0) - final val RIGHT = Coordinate(1, 0) - - case class Coordinate(row: Int, column: Int) { - def addOffset(coordinate: Coordinate): Coordinate = { - Coordinate(this.row + coordinate.row, this.column + coordinate.column) - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport.generator + +import scala.util.Random + +import io.gearpump.streaming.examples.transport.generator.MockCity._ + +class MockCity(size: Int) { + private val random = new Random() + private val directions = Array(UP, DOWN, LEFT, RIGHT) + + def nextLocation(currentLocationId: String): String = { + val coordinate = idToCoordinate(currentLocationId) + val direction = directions(random.nextInt(4)) + val newCoordinate = coordinate.addOffset(direction) + if (inCity(newCoordinate)) { + coordinateToId(newCoordinate) + } else { + nextLocation(currentLocationId) + } + } + + def getDistance(locationId1: String, locationId2: String): Long = { + val coordinate1 = idToCoordinate(locationId1) + val coordinate2 = idToCoordinate(locationId2) + val blocks = Math.abs(coordinate1.row - coordinate2.row) + + Math.abs(coordinate1.column - coordinate2.column) + blocks * LENGTH_PER_BLOCK + } + + def randomLocationId(): String = { + val row = random.nextInt(size) + val column = random.nextInt(size) + coordinateToId(Coordinate(row, column)) + } + + private def coordinateToId(coordinate: Coordinate): String = { + s"Id_${coordinate.row}_${coordinate.column}" + } + + private def idToCoordinate(locationId: String): Coordinate = { + val attr = locationId.split("_") + val row = attr(1).toInt + val column = attr(2).toInt + Coordinate(row, column) + } + + private def inCity(coordinate: Coordinate): Boolean = { + coordinate.row >= 0 && + coordinate.row < size && + coordinate.column >= 0 && + coordinate.column < size + } +} + +object MockCity { + // The length of the mock city, km + final val LENGTH_PER_BLOCK = 5 + // The minimal speed, km/h + final val MINIMAL_SPEED = 10 + + final val UP = Coordinate(0, 1) + final val DOWN = Coordinate(0, -1) + final val LEFT = Coordinate(-1, 0) + final val RIGHT = Coordinate(1, 0) + + case class Coordinate(row: Int, column: Int) { + def addOffset(coordinate: Coordinate): Coordinate = { + Coordinate(this.row + coordinate.row, this.column + coordinate.column) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala index 7a930d9..ee06b25 100644 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala +++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala @@ -1,68 +1,69 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import io.gearpump.streaming.examples.transport.PassRecord -import io.gearpump.util.LogUtil - -import scala.util.Random - -class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) { - private val LOG = LogUtil.getLogger(getClass) - LOG.info(s"Generate pass record for vehicle $vehicleId") - private var timeStamp = System.currentTimeMillis() - - private var locationId = city.randomLocationId() - private val random = new Random() - private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE - private val (randomMin, randomRange) = { - val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat - val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat - val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE - val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES) - val randomRange = upperBound - randomMin - (randomMin.toInt, randomRange.toInt) - } - - def getNextPassRecord(): PassRecord = { - locationId = if(fakePlate) { - city.randomLocationId() - } else { - city.nextLocation(locationId) - } - timeStamp += (random.nextInt(randomRange) + randomMin) - PassRecord(vehicleId, locationId, timeStamp) - } -} - -object PassRecordGenerator { - final val FAKE_PLATE_RATE = 0.01F - final val OVERDRIVE_RATE = 0.05F - final val TWOMINUTES = 2 * 60 * 1000 - - def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int): Array[PassRecordGenerator] = { - var result = Map.empty[String, PassRecordGenerator] - val digitsNum = (Math.log10(generatorNum) + 1).toInt - for(i <- 1 to generatorNum) { - val vehicleId = prefix + s"%0${digitsNum}d".format(i) - val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold) - result += vehicleId -> generator - } - result.values.toArray - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport.generator + +import scala.util.Random + +import io.gearpump.streaming.examples.transport.PassRecord +import io.gearpump.util.LogUtil + +class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) { + private val LOG = LogUtil.getLogger(getClass) + LOG.info(s"Generate pass record for vehicle $vehicleId") + private var timeStamp = System.currentTimeMillis() + + private var locationId = city.randomLocationId() + private val random = new Random() + private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE + private val (randomMin, randomRange) = { + val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat + val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat + val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE + val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES) + val randomRange = upperBound - randomMin + (randomMin.toInt, randomRange.toInt) + } + + def getNextPassRecord(): PassRecord = { + locationId = if (fakePlate) { + city.randomLocationId() + } else { + city.nextLocation(locationId) + } + timeStamp += (random.nextInt(randomRange) + randomMin) + PassRecord(vehicleId, locationId, timeStamp) + } +} + +object PassRecordGenerator { + final val FAKE_PLATE_RATE = 0.01F + final val OVERDRIVE_RATE = 0.05F + final val TWOMINUTES = 2 * 60 * 1000 + + def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int) + : Array[PassRecordGenerator] = { + var result = Map.empty[String, PassRecordGenerator] + val digitsNum = (Math.log10(generatorNum) + 1).toInt + for (i <- 1 to generatorNum) { + val vehicleId = prefix + s"%0${digitsNum}d".format(i) + val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold) + result += vehicleId -> generator + } + result.values.toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala index adf44c0..75f9d60 100644 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala +++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,15 +17,14 @@ */ package io.gearpump.streaming.examples.transport -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.examples.transport.{VelocityInspector, DataSource} -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.transport.VelocityInspector import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.scalatest.{Matchers, FlatSpec} +import org.scalatest.{FlatSpec, Matchers} + +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime class DataSourceSpec extends FlatSpec with Matchers { it should "create the pass record" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala index 94b89e1..b61fd43 100644 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala +++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,19 +17,19 @@ */ package io.gearpump.streaming.examples.transport -import io.gearpump.streaming.examples.transport.Transport -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{TestUtil, MasterHarness} -import io.gearpump.util.Util -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, Matchers, PropSpec} +import scala.concurrent.Future +import scala.util.Success + import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} -import scala.util.Success +import io.gearpump.cluster.ClientToMaster.SubmitApplication +import io.gearpump.cluster.MasterToClient.SubmitApplicationResult +import io.gearpump.cluster.{MasterHarness, TestUtil} -import scala.concurrent.Future +class TransportSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { -class TransportSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { override def beforeAll { startActorSystem() } @@ -38,9 +38,9 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG - property("Transport should succeed to submit application with required arguments"){ + property("Transport should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] val optionalArgs = Array( "-source", "1", @@ -59,10 +59,11 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {Transport.main(masterConfig, args)} + Future { + Transport.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala index 2f1495d..e91d91c 100644 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala +++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala @@ -1,32 +1,31 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import io.gearpump.streaming.examples.transport.generator.MockCity -import org.scalatest.{Matchers, PropSpec} -import org.scalatest.prop.PropertyChecks - -class MockCitySpec extends PropSpec with PropertyChecks with Matchers{ - - property("MockCity should maintain the location properly") { - val city = new MockCity(10) - val start = city.randomLocationId() - val nextLocation = city.nextLocation(start) - assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK) - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gearpump.streaming.examples.transport.generator + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class MockCitySpec extends PropSpec with PropertyChecks with Matchers { + + property("MockCity should maintain the location properly") { + val city = new MockCity(10) + val start = city.randomLocationId() + val nextLocation = city.nextLocation(start) + assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala index d1ce32d..1c1901e 100644 --- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala +++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,19 +17,18 @@ */ package io.gearpump.streaming.examples.transport.generator -import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} -import io.gearpump.streaming.examples.transport.generator.MockCity -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} -class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers{ +class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers { - property("PassRecordGenerator should generate pass record"){ + property("PassRecordGenerator should generate pass record") { val id = "test" val city = new MockCity(10) val generator = new PassRecordGenerator(id, city, 60) val passrecord1 = generator.getNextPassRecord() val passrecord2 = generator.getNextPassRecord() - assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == MockCity.LENGTH_PER_BLOCK) + assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == + MockCity.LENGTH_PER_BLOCK) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java index 64a0f8b..720e179 100644 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java index bb45651..28cf8cb 100644 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,7 +27,7 @@ import org.slf4j.Logger; import java.util.HashMap; -public class Sum extends Task { +public class Sum extends Task { private Logger LOG = super.LOG(); private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); @@ -43,7 +43,7 @@ public class Sum extends Task { @Override public void onNext(Message messagePayLoad) { - String word = (String)(messagePayLoad.msg()); + String word = (String) (messagePayLoad.msg()); Integer current = wordCount.get(word); if (current == null) { current = 0; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java index 9e1e7d5..40054d3 100644 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -29,6 +29,7 @@ import io.gearpump.streaming.javaapi.Graph; import io.gearpump.streaming.javaapi.Processor; import io.gearpump.streaming.javaapi.StreamApplication; +/** Java version of WordCount with Processor Graph API */ public class WordCount { public static void main(String[] args) throws InterruptedException { @@ -53,11 +54,9 @@ public class WordCount { Partitioner partitioner = new HashPartitioner(); graph.addEdge(split, partitioner, sum); - UserConfig conf = UserConfig.empty(); StreamApplication app = new StreamApplication("wordcountJava", conf, graph); - EmbeddedCluster localCluster = null; Boolean debugMode = System.getProperty("DEBUG") != null; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 6857017..3aefd7f 100644 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -34,6 +34,7 @@ import scala.Tuple2; import java.util.Iterator; import java.util.List; +/** Java version of WordCount with high level DSL API */ public class WordCount { public static void main(String[] args) throws InterruptedException { @@ -61,14 +62,14 @@ public class WordCount { } }, "map"); - JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() { + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() { @Override public String apply(Tuple2<String, Integer> tuple) { return tuple._1(); } }, 1, "groupBy"); - JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); @@ -80,7 +81,4 @@ public class WordCount { app.run(); context.close(); } - - - }
