http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala new file mode 100644 index 0000000..a488c9f --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import com.twitter.algebird.AveragedValue +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.state.api.PersistentTask +import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig} +import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory + +class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { + property("WindowAverageProcessor should update state") { + + implicit val system = ActorSystem("test") + val longGen = Gen.chooseNum[Long](1, 1000) + forAll(longGen, longGen) { + (data: Long, num: Long) => + val taskContext = MockUtil.mockTaskContext + + val windowSize = num + val windowStep = num + + val conf = UserConfig.empty + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) + .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, + new InMemoryCheckpointStoreFactory) + .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) + + val windowAverage = new WindowAverageProcessor(taskContext, conf) + + val appMaster = TestProbe()(system) + when(taskContext.appMaster).thenReturn(appMaster.ref) + + windowAverage.onStart(StartTime(0L)) + appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) + + for (i <- 0L until num) { + windowAverage.onNext(Message("" + data, i)) + windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data)) + } + + // Next checkpoint time is not arrived yet + when(taskContext.upstreamMinClock).thenReturn(0L) + windowAverage.onNext(PersistentTask.CHECKPOINT) + appMaster.expectNoMsg(10.milliseconds) + + // Time to checkpoint + when(taskContext.upstreamMinClock).thenReturn(num) + windowAverage.onNext(PersistentTask.CHECKPOINT) + appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) + } + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md index 4aea13d..b51590f 100644 --- a/examples/streaming/stockcrawler/README.md +++ b/examples/streaming/stockcrawler/README.md @@ -6,12 +6,12 @@ How to use ``` 2. Submit the stock crawler ``` - bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar io.gearpump.streaming.examples.stock.main.Stock + bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock ``` If you are behind a proxy, you need to set the proxy address ``` - bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar io.gearpump.streaming.examples.stock.main.Stock -proxy host:port + bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port ``` 3. Check the UI http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf index f557d66..acee3bd 100644 --- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf +++ b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf @@ -1,6 +1,6 @@ gearpump { serializers { - "io.gearpump.streaming.examples.stock.StockPrice" = "" + "org.apache.gearpump.streaming.examples.stock.StockPrice" = "" } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala deleted file mode 100644 index 1aea563..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock - -import scala.collection.immutable - -import akka.actor.Actor.Receive -import org.joda.time.DateTime -import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.stock.Analyzer.HistoricalStates -import io.gearpump.streaming.examples.stock.Price._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.util.LogUtil - -/** - * Dradown analyzer - * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics) - */ -class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" - - private var stocksToReport = immutable.Set.empty[String] - private var stockInfos = new immutable.HashMap[String, StockPrice] - - private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState] - private val historicalStates = new HistoricalStates() - private var latestTimeStamp: Long = 0L - - override def onStart(startTime: StartTime): Unit = { - LOG.info("analyzer is started") - } - - override def onNext(msg: Message): Unit = { - msg.msg match { - case stock: StockPrice => - latestTimeStamp = stock.timestamp - checkDate(stock) - stockInfos += stock.stockId -> stock - val downwardsState = updateCurrentStates(stock) - val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState) - } - } - - override def receiveUnManagedMessage: Receive = { - case get@GetReport(stockId, date) => - var currentMax = currentDownwardsStates.get(stockId) - - val dateTime = Option(date) match { - case Some(date) => - currentMax = None - parseDate(dateFormatter, date) - case None => - new DateTime(latestTimeStamp).withTimeAtStartOfDay - } - - val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _)) - val name = stockInfos.get(stockId).map(_.name).getOrElse("") - sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax) - } - - private def updateCurrentStates(stock: StockPrice) = { - var downwardsState: StockPriceState = null - if (currentDownwardsStates.contains(stock.stockId)) { - downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get) - } else { - downwardsState = StockPriceState(stock.stockId, stock, stock, stock) - } - currentDownwardsStates += stock.stockId -> downwardsState - downwardsState - } - - // Update the stock's latest state. - private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = { - if (currentPrice.price > oldState.max.price) { - StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice) - } else { - val newState = StockPriceState(oldState.stockID, oldState.max, - Price.min(currentPrice, oldState.min), currentPrice) - newState - } - } - - private def checkDate(stock: StockPrice) = { - if (currentDownwardsStates.contains(stock.stockId)) { - val now = new DateTime(stock.timestamp) - val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp) - // New day - if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) { - currentDownwardsStates -= stock.stockId - } - } - } - - private def parseDate(format: DateTimeFormatter, input: String): DateTime = { - format.parseDateTime(input) - } - - private def handleHistoricalQuery(stockId: String, date: DateTime) = { - val maximal = historicalStates.getHistoricalMaximal(stockId, date) - maximal - } -} - -object Analyzer { - - class HistoricalStates { - val LOG = LogUtil.getLogger(getClass) - val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" - private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState] - private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState] - - def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = { - val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp) - var newMaximalState: Option[StockPriceState] = null - if (newState.max.price < Float.MinPositiveValue) { - newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise) - if (newMaximalState.nonEmpty) { - historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get - } - } else { - newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown) - if (newMaximalState.nonEmpty) { - historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get - } - } - newMaximalState - } - - def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = { - historicalMaxDrawdown.get((stockId, date)) - } - - private def generateNewMaximal( - state: StockPriceState, - date: DateTime, - map: immutable.HashMap[(String, DateTime), StockPriceState]) - : Option[StockPriceState] = { - val maximal = map.get((state.stockID, date)) - if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) { - None - } else { - Some(state) - } - } - } - - def getDateFromTimeStamp(timestamp: Long): DateTime = { - new DateTime(timestamp).withTimeAtStartOfDay() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala deleted file mode 100644 index da5ab63..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock - -import scala.concurrent.duration._ - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - import taskContext._ - - val FetchStockPrice = Message("FetchStockPrice") - - lazy val stocks = { - val stockIds = conf.getValue[Array[String]]("StockId").get - val size = if (stockIds.length % parallelism > 0) { - stockIds.length / parallelism + 1 - } else { - stockIds.length / parallelism - } - - val start = taskId.index * size - val end = (taskId.index + 1) * size - stockIds.slice(start, end) - } - - scheduleOnce(1.seconds)(self ! FetchStockPrice) - - val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get - - override def onStart(startTime: StartTime): Unit = { - // Nothing - } - - override def onNext(msg: Message): Unit = { - stockMarket.getPrice(stocks).foreach { price => - output(new Message(price, price.timestamp)) - } - scheduleOnce(5.seconds)(self ! FetchStockPrice) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala deleted file mode 100644 index 5525524..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock - -// scalastyle:off equals.hash.code case class has equals defined -case class StockPrice( - stockId: String, name: String, price: String, delta: String, pecent: String, volume: String, - money: String, timestamp: Long) { - override def hashCode: Int = stockId.hashCode -} -// scalastyle:on equals.hash.code case class has equals defined - -case class Price(price: Float, timestamp: Long) - -object Price { - - import scala.language.implicitConversions - - implicit def StockPriceToPrice(stock: StockPrice): Price = { - Price(stock.price.toFloat, stock.timestamp) - } - - def min(first: Price, second: Price): Price = { - if (first.price < second.price) { - first - } else { - second - } - } -} - -case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) { - - def drawDownPeriod: Long = min.timestamp - max.timestamp - - def recoveryPeriod: Long = current.timestamp - min.timestamp - - def drawDown: Float = max.price - min.price -} - -case class GetReport(stockId: String, date: String) - -case class Report( - stockId: String, name: String, date: String, historyMax: Option[StockPriceState], - currentMax: Option[StockPriceState]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala deleted file mode 100644 index 33db128..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock - -import java.util.concurrent.TimeUnit -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -import akka.actor.Actor._ -import akka.actor.{Actor, ActorRefFactory, Props} -import akka.io.IO -import akka.pattern.ask -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.{HttpService, Route} -import upickle.default.write - -import io.gearpump.Message -import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} -import io.gearpump.streaming.examples.stock.QueryServer.WebServer -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} - -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import scala.concurrent.ExecutionContext.Implicits.global - - import taskContext.{appId, appMaster} - - var analyzer: (ProcessorId, ProcessorSummary) = null - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - - override def onStart(startTime: StartTime): Unit = { - appMaster ! AppMasterDataDetailRequest(appId) - taskContext.actorOf(Props(new WebServer)) - } - - override def onNext(msg: Message): Unit = { - // Skip - } - - override def receiveUnManagedMessage: Receive = messageHandler - - def messageHandler: Receive = { - case detail: StreamAppMasterSummary => - analyzer = detail.processors.find { kv => - val (processorId, processor) = kv - processor.taskClass == classOf[Analyzer].getName - }.get - case getReport@GetReport(stockId, date) => - val parallism = analyzer._2.parallelism - val processorId = analyzer._1 - val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism) - val requester = sender - import scala.concurrent.Future - (appMaster ? LookupTaskActorRef(analyzerTaskId)) - .asInstanceOf[Future[TaskActorRef]].flatMap { task => - - (task.task ? getReport).asInstanceOf[Future[Report]] - }.map { report => - LOG.info(s"reporting $report") - requester ! report - } - case _ => - // Ignore - } -} - -object QueryServer { - class WebServer extends Actor with HttpService { - - import context.dispatcher - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory: ActorRefFactory = context - implicit val system = context.system - - IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080) - - override def receive: Receive = runRoute(webServer ~ staticRoute) - - def webServer: Route = { - path("report" / Segment) { stockId => - get { - onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) { - case Success(report: Report) => - val json = write(report) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } - } - - val staticRoute = { - pathEndOrSingleSlash { - getFromResource("stock/stock.html") - } ~ - pathPrefix("css") { - get { - getFromResourceDirectory("stock/css") - } - } ~ - pathPrefix("js") { - get { - getFromResourceDirectory("stock/js") - } - } - } - - private def pretty(json: String): String = { - json.parseJson.prettyPrint - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala deleted file mode 100644 index 508c282..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock - -import java.nio.charset.Charset -import scala.io.Codec - -import org.apache.commons.httpclient.methods.GetMethod -import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager} -import org.htmlcleaner.{HtmlCleaner, TagNode} -import org.joda.time.{DateTime, DateTimeZone} - -import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour -import io.gearpump.transport.HostPort -import io.gearpump.util.LogUtil - -class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable { - - private def LOG = LogUtil.getLogger(getClass) - - @transient - private var connectionManager: MultiThreadedHttpConnectionManager = null - - private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html" - - private val stockPriceParser = - """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r - - def shutdown(): Unit = { - Option(connectionManager).map(_.shutdown()) - } - - @transient - private var _client: HttpClient = null - - private def client: HttpClient = { - _client = Option(_client).getOrElse { - val connectionManager = new MultiThreadedHttpConnectionManager() - val client = new HttpClient(connectionManager) - Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port)) - client - } - _client - } - - def getPrice(stocks: Array[String]): Array[StockPrice] = { - - LOG.info(s"getPrice 1") - - val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",") - if (service.inService) { - - LOG.info(s"getPrice 2") - - val get = new GetMethod(query) - client.executeMethod(get) - val current = System.currentTimeMillis() - - val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)( - new Codec(Charset forName "GBK")).getLines().flatMap { line => - line match { - case stockPriceParser(stockId, name, price, delta, pecent, volume, money) => - Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current)) - case _ => - None - } - }.toArray - - LOG.info(s"getPrice 3 ${output.length}") - - output - } else { - Array.empty[StockPrice] - } - } - - private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r - - def getStockIdList: Array[String] = { - val cleaner = new HtmlCleaner - val props = cleaner.getProperties - - val get = new GetMethod(eastMoneyStockPage) - client.executeMethod(get) - - val root = cleaner.clean(get.getResponseBodyAsStream) - - val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]") - - val elements = root.getElementsByName("a", true) - - val hrefs = (0 until stockUrls.length) - .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href")) - .map { url => - url match { - case urlPattern(code) => code - case _ => null - } - }.toArray - hrefs - } -} - -object StockMarket { - - class ServiceHour(all: Boolean) extends Serializable { - - /** - * Morning openning: 9:30 am - 11:30 am - */ - val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis - val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis - - /** - * After noon openning: 13:00 pm - 15:00 pm - */ - val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis - val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis - - def inService: Boolean = { - - if (all) { - true - } else { - val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis - if (now >= morningStart && now <= morningEnd || - now >= afternoonStart && now <= afternoonEnd) { - true - } else { - false - } - } - } - - private def GMT8(time: DateTime): DateTime = { - time.withZone(DateTimeZone.UTC).plusHours(8) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala deleted file mode 100644 index 638dc4e..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.stock.main - -import akka.actor.ActorSystem -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour -import io.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket} -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.transport.HostPort -import io.gearpump.util.Graph.Node -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -/** Tracks the China's stock market index change */ -object Stock extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>", - required = false, defaultValue = Some(10)), - "analyzer" -> CLIOption[Int]("<parallism of analyzer>", - required = false, defaultValue = Some(1)), - "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", - required = false, defaultValue = Some(""))) - - def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { - val crawler = Processor[Crawler](config.getInt("crawler")) - val analyzer = Processor[Analyzer](config.getInt("analyzer")) - val queryServer = Processor[QueryServer](1) - - val proxySetting = config.getString("proxy") - val proxy = if (proxySetting.isEmpty) { - null - } else HostPort(proxySetting) - val stockMarket = new StockMarket(new ServiceHour(true), proxy) - val stocks = stockMarket.getStockIdList - - // scalastyle:off println - Console.println(s"Successfully fetched stock id for ${stocks.length} stocks") - // scalastyle:on println - - val userConfig = UserConfig.empty.withValue("StockId", stocks) - .withValue[StockMarket](classOf[StockMarket].getName, stockMarket) - val partitioner = new HashPartitioner - - val p1 = crawler ~ partitioner ~> analyzer - val p2 = Node(queryServer) - val graph = Graph(p1, p2) - val app = StreamApplication("stock_direct_analyzer", graph, userConfig - ) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - - implicit val system = context.system - - val app = crawler(config) - val appId = context.submit(app) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala new file mode 100644 index 0000000..f6fdff2 --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock + +import scala.collection.immutable + +import akka.actor.Actor.Receive +import org.joda.time.DateTime +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates +import org.apache.gearpump.streaming.examples.stock.Price._ +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.util.LogUtil + +/** + * Dradown analyzer + * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics) + */ +class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + + val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" + + private var stocksToReport = immutable.Set.empty[String] + private var stockInfos = new immutable.HashMap[String, StockPrice] + + private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState] + private val historicalStates = new HistoricalStates() + private var latestTimeStamp: Long = 0L + + override def onStart(startTime: StartTime): Unit = { + LOG.info("analyzer is started") + } + + override def onNext(msg: Message): Unit = { + msg.msg match { + case stock: StockPrice => + latestTimeStamp = stock.timestamp + checkDate(stock) + stockInfos += stock.stockId -> stock + val downwardsState = updateCurrentStates(stock) + val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState) + } + } + + override def receiveUnManagedMessage: Receive = { + case get@GetReport(stockId, date) => + var currentMax = currentDownwardsStates.get(stockId) + + val dateTime = Option(date) match { + case Some(date) => + currentMax = None + parseDate(dateFormatter, date) + case None => + new DateTime(latestTimeStamp).withTimeAtStartOfDay + } + + val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _)) + val name = stockInfos.get(stockId).map(_.name).getOrElse("") + sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax) + } + + private def updateCurrentStates(stock: StockPrice) = { + var downwardsState: StockPriceState = null + if (currentDownwardsStates.contains(stock.stockId)) { + downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get) + } else { + downwardsState = StockPriceState(stock.stockId, stock, stock, stock) + } + currentDownwardsStates += stock.stockId -> downwardsState + downwardsState + } + + // Update the stock's latest state. + private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = { + if (currentPrice.price > oldState.max.price) { + StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice) + } else { + val newState = StockPriceState(oldState.stockID, oldState.max, + Price.min(currentPrice, oldState.min), currentPrice) + newState + } + } + + private def checkDate(stock: StockPrice) = { + if (currentDownwardsStates.contains(stock.stockId)) { + val now = new DateTime(stock.timestamp) + val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp) + // New day + if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) { + currentDownwardsStates -= stock.stockId + } + } + } + + private def parseDate(format: DateTimeFormatter, input: String): DateTime = { + format.parseDateTime(input) + } + + private def handleHistoricalQuery(stockId: String, date: DateTime) = { + val maximal = historicalStates.getHistoricalMaximal(stockId, date) + maximal + } +} + +object Analyzer { + + class HistoricalStates { + val LOG = LogUtil.getLogger(getClass) + val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" + private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState] + private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState] + + def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = { + val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp) + var newMaximalState: Option[StockPriceState] = null + if (newState.max.price < Float.MinPositiveValue) { + newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise) + if (newMaximalState.nonEmpty) { + historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get + } + } else { + newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown) + if (newMaximalState.nonEmpty) { + historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get + } + } + newMaximalState + } + + def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = { + historicalMaxDrawdown.get((stockId, date)) + } + + private def generateNewMaximal( + state: StockPriceState, + date: DateTime, + map: immutable.HashMap[(String, DateTime), StockPriceState]) + : Option[StockPriceState] = { + val maximal = map.get((state.stockID, date)) + if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) { + None + } else { + Some(state) + } + } + } + + def getDateFromTimeStamp(timestamp: Long): DateTime = { + new DateTime(timestamp).withTimeAtStartOfDay() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala new file mode 100644 index 0000000..bb444dd --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock + +import scala.concurrent.duration._ + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + + import taskContext._ + + val FetchStockPrice = Message("FetchStockPrice") + + lazy val stocks = { + val stockIds = conf.getValue[Array[String]]("StockId").get + val size = if (stockIds.length % parallelism > 0) { + stockIds.length / parallelism + 1 + } else { + stockIds.length / parallelism + } + + val start = taskId.index * size + val end = (taskId.index + 1) * size + stockIds.slice(start, end) + } + + scheduleOnce(1.seconds)(self ! FetchStockPrice) + + val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get + + override def onStart(startTime: StartTime): Unit = { + // Nothing + } + + override def onNext(msg: Message): Unit = { + stockMarket.getPrice(stocks).foreach { price => + output(new Message(price, price.timestamp)) + } + scheduleOnce(5.seconds)(self ! FetchStockPrice) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala new file mode 100644 index 0000000..94a85ff --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock + +// scalastyle:off equals.hash.code case class has equals defined +case class StockPrice( + stockId: String, name: String, price: String, delta: String, pecent: String, volume: String, + money: String, timestamp: Long) { + override def hashCode: Int = stockId.hashCode +} +// scalastyle:on equals.hash.code case class has equals defined + +case class Price(price: Float, timestamp: Long) + +object Price { + + import scala.language.implicitConversions + + implicit def StockPriceToPrice(stock: StockPrice): Price = { + Price(stock.price.toFloat, stock.timestamp) + } + + def min(first: Price, second: Price): Price = { + if (first.price < second.price) { + first + } else { + second + } + } +} + +case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) { + + def drawDownPeriod: Long = min.timestamp - max.timestamp + + def recoveryPeriod: Long = current.timestamp - min.timestamp + + def drawDown: Float = max.price - min.price +} + +case class GetReport(stockId: String, date: String) + +case class Report( + stockId: String, name: String, date: String, historyMax: Option[StockPriceState], + currentMax: Option[StockPriceState]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala new file mode 100644 index 0000000..01ccb3e --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock + +import java.util.concurrent.TimeUnit +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +import akka.actor.Actor._ +import akka.actor.{Actor, ActorRefFactory, Props} +import akka.io.IO +import akka.pattern.ask +import spray.can.Http +import spray.http.StatusCodes +import spray.json._ +import spray.routing.{HttpService, Route} +import upickle.default.write + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} +import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} + +class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import scala.concurrent.ExecutionContext.Implicits.global + + import taskContext.{appId, appMaster} + + var analyzer: (ProcessorId, ProcessorSummary) = null + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + + override def onStart(startTime: StartTime): Unit = { + appMaster ! AppMasterDataDetailRequest(appId) + taskContext.actorOf(Props(new WebServer)) + } + + override def onNext(msg: Message): Unit = { + // Skip + } + + override def receiveUnManagedMessage: Receive = messageHandler + + def messageHandler: Receive = { + case detail: StreamAppMasterSummary => + analyzer = detail.processors.find { kv => + val (processorId, processor) = kv + processor.taskClass == classOf[Analyzer].getName + }.get + case getReport@GetReport(stockId, date) => + val parallism = analyzer._2.parallelism + val processorId = analyzer._1 + val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism) + val requester = sender + import scala.concurrent.Future + (appMaster ? LookupTaskActorRef(analyzerTaskId)) + .asInstanceOf[Future[TaskActorRef]].flatMap { task => + + (task.task ? getReport).asInstanceOf[Future[Report]] + }.map { report => + LOG.info(s"reporting $report") + requester ! report + } + case _ => + // Ignore + } +} + +object QueryServer { + class WebServer extends Actor with HttpService { + + import context.dispatcher + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + def actorRefFactory: ActorRefFactory = context + implicit val system = context.system + + IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080) + + override def receive: Receive = runRoute(webServer ~ staticRoute) + + def webServer: Route = { + path("report" / Segment) { stockId => + get { + onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) { + case Success(report: Report) => + val json = write(report) + complete(pretty(json)) + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") + } + } + } + } + + val staticRoute = { + pathEndOrSingleSlash { + getFromResource("stock/stock.html") + } ~ + pathPrefix("css") { + get { + getFromResourceDirectory("stock/css") + } + } ~ + pathPrefix("js") { + get { + getFromResourceDirectory("stock/js") + } + } + } + + private def pretty(json: String): String = { + json.parseJson.prettyPrint + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala new file mode 100644 index 0000000..24e050b --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock + +import java.nio.charset.Charset +import scala.io.Codec + +import org.apache.commons.httpclient.methods.GetMethod +import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager} +import org.htmlcleaner.{HtmlCleaner, TagNode} +import org.joda.time.{DateTime, DateTimeZone} + +import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.LogUtil + +class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable { + + private def LOG = LogUtil.getLogger(getClass) + + @transient + private var connectionManager: MultiThreadedHttpConnectionManager = null + + private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html" + + private val stockPriceParser = + """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r + + def shutdown(): Unit = { + Option(connectionManager).map(_.shutdown()) + } + + @transient + private var _client: HttpClient = null + + private def client: HttpClient = { + _client = Option(_client).getOrElse { + val connectionManager = new MultiThreadedHttpConnectionManager() + val client = new HttpClient(connectionManager) + Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port)) + client + } + _client + } + + def getPrice(stocks: Array[String]): Array[StockPrice] = { + + LOG.info(s"getPrice 1") + + val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",") + if (service.inService) { + + LOG.info(s"getPrice 2") + + val get = new GetMethod(query) + client.executeMethod(get) + val current = System.currentTimeMillis() + + val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)( + new Codec(Charset forName "GBK")).getLines().flatMap { line => + line match { + case stockPriceParser(stockId, name, price, delta, pecent, volume, money) => + Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current)) + case _ => + None + } + }.toArray + + LOG.info(s"getPrice 3 ${output.length}") + + output + } else { + Array.empty[StockPrice] + } + } + + private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r + + def getStockIdList: Array[String] = { + val cleaner = new HtmlCleaner + val props = cleaner.getProperties + + val get = new GetMethod(eastMoneyStockPage) + client.executeMethod(get) + + val root = cleaner.clean(get.getResponseBodyAsStream) + + val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]") + + val elements = root.getElementsByName("a", true) + + val hrefs = (0 until stockUrls.length) + .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href")) + .map { url => + url match { + case urlPattern(code) => code + case _ => null + } + }.toArray + hrefs + } +} + +object StockMarket { + + class ServiceHour(all: Boolean) extends Serializable { + + /** + * Morning openning: 9:30 am - 11:30 am + */ + val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis + val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis + + /** + * After noon openning: 13:00 pm - 15:00 pm + */ + val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis + val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis + + def inService: Boolean = { + + if (all) { + true + } else { + val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis + if (now >= morningStart && now <= morningEnd || + now >= afternoonStart && now <= afternoonEnd) { + true + } else { + false + } + } + } + + private def GMT8(time: DateTime): DateTime = { + time.withZone(DateTimeZone.UTC).plusHours(8) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala new file mode 100644 index 0000000..6d17c20 --- /dev/null +++ b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.stock.main + +import akka.actor.ActorSystem +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour +import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket} +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +/** Tracks the China's stock market index change */ +object Stock extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>", + required = false, defaultValue = Some(10)), + "analyzer" -> CLIOption[Int]("<parallism of analyzer>", + required = false, defaultValue = Some(1)), + "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", + required = false, defaultValue = Some(""))) + + def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { + val crawler = Processor[Crawler](config.getInt("crawler")) + val analyzer = Processor[Analyzer](config.getInt("analyzer")) + val queryServer = Processor[QueryServer](1) + + val proxySetting = config.getString("proxy") + val proxy = if (proxySetting.isEmpty) { + null + } else HostPort(proxySetting) + val stockMarket = new StockMarket(new ServiceHour(true), proxy) + val stocks = stockMarket.getStockIdList + + // scalastyle:off println + Console.println(s"Successfully fetched stock id for ${stocks.length} stocks") + // scalastyle:on println + + val userConfig = UserConfig.empty.withValue("StockId", stocks) + .withValue[StockMarket](classOf[StockMarket].getName, stockMarket) + val partitioner = new HashPartitioner + + val p1 = crawler ~ partitioner ~> analyzer + val p2 = Node(queryServer) + val graph = Graph(p1, p2) + val app = StreamApplication("stock_direct_analyzer", graph, userConfig + ) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + + implicit val system = context.system + + val app = crawler(config) + val appId = context.submit(app) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf index 165bce5..0c8f421 100644 --- a/examples/streaming/transport/src/main/resources/geardefault.conf +++ b/examples/streaming/transport/src/main/resources/geardefault.conf @@ -2,8 +2,8 @@ gearpump { serializers { ## Follow this format when adding new serializer for new message types - ## "io.gearpump.Message" = "io.gearpump.streaming.MessageSerializer" - "io.gearpump.streaming.examples.transport.PassRecord" = "" + ## "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer" + "org.apache.gearpump.streaming.examples.transport.PassRecord" = "" } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala deleted file mode 100644 index 788f92a..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -case class LocationInfo(id: String, row: Int, column: Int) - -// scalastyle:off equals.hash.code -case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) { - override def hashCode: Int = vehicleId.hashCode -} -// scalastyle:on equals.hash.code - -case class GetTrace(vehicleId: String) - -case class VehicleTrace(records: Array[PassRecord]) - -case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala deleted file mode 100644 index 33d7b54..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import scala.concurrent.duration._ - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} - -class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.{output, parallelism, scheduleOnce, taskId} - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private val recordGenerators: Array[PassRecordGenerator] = - PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold) - - override def onStart(startTime: StartTime): Unit = { - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(msg: Message): Unit = { - recordGenerators.foreach(generator => - output(Message(generator.getNextPassRecord(), System.currentTimeMillis()))) - scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis())) - } - - private def getIdentifier(taskId: TaskId): String = { - // scalastyle:off non.ascii.character.disallowed - s"沪A${taskId.processorId}${taskId.index}" - // scalastyle:on non.ascii.character.disallowed - } -} - -object DataSource { - final val VEHICLE_NUM = "vehicle.number" - final val MOCK_CITY_SIZE = "mock.city.size" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala deleted file mode 100644 index f9dbbde..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.util.{Failure, Success} - -import akka.actor.Actor._ -import akka.actor.{Actor, ActorRefFactory, Props} -import akka.io.IO -import akka.pattern.ask -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.{HttpService, Route} -import upickle.default.write - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication} -import io.gearpump.util.Graph - -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import system.dispatcher - import taskContext.appMaster - - var inspector: (ProcessorId, ProcessorDescription) = null - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - private var overSpeedRecords = List.empty[OverSpeedReport] - - override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( - StreamApplication.DAG).get) - inspector = dag.processors.find { kv => - val (_, processor) = kv - processor.taskClass == classOf[VelocityInspector].getName - }.get - taskContext.actorOf(Props(new WebServer)) - } - - override def onNext(msg: Message): Unit = { - } - - override def receiveUnManagedMessage: Receive = { - case getTrace@GetTrace(vehicleId: String) => - val parallism = inspector._2.parallelism - val processorId = inspector._1 - val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism) - val requester = sender - (appMaster ? LookupTaskActorRef(analyzerTaskId)) - .asInstanceOf[Future[TaskActorRef]].flatMap { task => - (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]] - }.map { trace => - LOG.info(s"reporting $trace") - requester ! trace - } - case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) => - LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h") - overSpeedRecords :+= record - case GetAllRecords => - sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp)) - overSpeedRecords = List.empty[OverSpeedReport] - case _ => - // Ignore - } -} - -object QueryServer { - object GetAllRecords - - case class OverSpeedRecords(records: Array[OverSpeedReport]) - - class WebServer extends Actor with HttpService { - - import context.dispatcher - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory: ActorRefFactory = context - implicit val system = context.system - - IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080) - - override def receive: Receive = runRoute(webServer ~ staticRoute) - - def webServer: Route = { - path("trace" / Segment) { vehicleId => - get { - onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) { - case Success(trace: VehicleTrace) => - val json = write(trace) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } ~ - path("records") { - get { - onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) { - case Success(records: OverSpeedRecords) => - val json = write(records) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } - } - - val staticRoute = { - pathEndOrSingleSlash { - getFromResource("transport/transport.html") - } ~ - pathPrefix("css") { - get { - getFromResourceDirectory("transport/css") - } - } ~ - pathPrefix("svg") { - get { - getFromResourceDirectory("transport/svg") - } - } ~ - pathPrefix("js") { - get { - getFromResourceDirectory("transport/js") - } - } - } - - private def pretty(json: String): String = { - json.parseJson.prettyPrint - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala deleted file mode 100644 index a795277..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph} - -/** A city smart transportation streaming application */ -object Transport extends AkkaApp with ArgumentsParser { - override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<how many task to generate data>", required = false, - defaultValue = Some(10)), - "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false, - defaultValue = Some(4)), - "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false, - defaultValue = Some(1000)), - "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false, - defaultValue = Some(10)), - "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false, - defaultValue = Some(60))) - - def application(config: ParseResult): StreamApplication = { - val sourceNum = config.getInt("source") - val inspectorNum = config.getInt("inspector") - val vehicleNum = config.getInt("vehicle") - val citysize = config.getInt("citysize") - val threshold = config.getInt("threshold") - val source = Processor[DataSource](sourceNum) - val inspector = Processor[VelocityInspector](inspectorNum) - val queryServer = Processor[QueryServer](1) - val partitioner = new HashPartitioner - - val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). - withInt(DataSource.MOCK_CITY_SIZE, citysize). - withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold). - withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) - StreamApplication("transport", Graph(source ~ partitioner ~> inspector, - Node(queryServer)), userConfig) - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - implicit val system = context.system - context.submit(application(config)) - context.close() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala deleted file mode 100644 index b9be8d7..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport - -import java.util.concurrent.TimeUnit -import scala.collection.immutable.Queue -import scala.collection.mutable -import scala.concurrent.Future - -import akka.actor.Actor._ -import akka.actor.ActorRef -import akka.pattern.ask - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.examples.transport.generator.MockCity -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication} -import io.gearpump.util.Graph - -class VelocityInspector(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - - import system.dispatcher - import taskContext.appMaster - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - private val passRecords = mutable.Map.empty[String, Queue[PassRecord]] - private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private var queryServerActor: ActorRef = null - - override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( - StreamApplication.DAG).get) - val queryServer = dag.processors.find { kv => - val (_, processor) = kv - processor.taskClass == classOf[QueryServer].getName - }.get - val queryServerTaskId = TaskId(queryServer._1, 0) - (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]] - .map { task => - queryServerActor = task.task - } - } - - import io.gearpump.streaming.examples.transport.VelocityInspector._ - override def onNext(msg: Message): Unit = { - msg.msg match { - case passRecord: PassRecord => - val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord]) - if (records.size > 0) { - val velocity = getVelocity(passRecord, records.last) - val formatted = "%.2f".format(velocity) - if (velocity > overdriveThreshold) { - if (velocity > fakePlateThreshold) { - LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " + - s"the speed is $formatted km/h") - } - if (queryServerActor != null) { - queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, - passRecord.timeStamp, passRecord.locationId) - } - } - } - passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM)) - } - } - - override def receiveUnManagedMessage: Receive = { - case GetTrace(vehicleId) => - val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord]) - sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp)) - } - - private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = { - val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId) - val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60) - distanceInKm / timeInHour - } - - private def getDistance(location1: String, location2: String): Long = { - mockCity.getDistance(location1, location2) - } -} - -object VelocityInspector { - final val OVER_DRIVE_THRESHOLD = "overdrive.threshold" - final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold" - final val RECORDS_NUM = 20 - - class FiniteQueue[T](q: Queue[T]) { - def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = { - var result = q.enqueue(elem) - while (result.size > maxSize) { - result = result.dequeue._2 - } - result - } - } - - import scala.language.implicitConversions - - implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala deleted file mode 100644 index ff78679..0000000 --- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.transport.generator - -import scala.util.Random - -import io.gearpump.streaming.examples.transport.generator.MockCity._ - -class MockCity(size: Int) { - private val random = new Random() - private val directions = Array(UP, DOWN, LEFT, RIGHT) - - def nextLocation(currentLocationId: String): String = { - val coordinate = idToCoordinate(currentLocationId) - val direction = directions(random.nextInt(4)) - val newCoordinate = coordinate.addOffset(direction) - if (inCity(newCoordinate)) { - coordinateToId(newCoordinate) - } else { - nextLocation(currentLocationId) - } - } - - def getDistance(locationId1: String, locationId2: String): Long = { - val coordinate1 = idToCoordinate(locationId1) - val coordinate2 = idToCoordinate(locationId2) - val blocks = Math.abs(coordinate1.row - coordinate2.row) + - Math.abs(coordinate1.column - coordinate2.column) - blocks * LENGTH_PER_BLOCK - } - - def randomLocationId(): String = { - val row = random.nextInt(size) - val column = random.nextInt(size) - coordinateToId(Coordinate(row, column)) - } - - private def coordinateToId(coordinate: Coordinate): String = { - s"Id_${coordinate.row}_${coordinate.column}" - } - - private def idToCoordinate(locationId: String): Coordinate = { - val attr = locationId.split("_") - val row = attr(1).toInt - val column = attr(2).toInt - Coordinate(row, column) - } - - private def inCity(coordinate: Coordinate): Boolean = { - coordinate.row >= 0 && - coordinate.row < size && - coordinate.column >= 0 && - coordinate.column < size - } -} - -object MockCity { - // The length of the mock city, km - final val LENGTH_PER_BLOCK = 5 - // The minimal speed, km/h - final val MINIMAL_SPEED = 10 - - final val UP = Coordinate(0, 1) - final val DOWN = Coordinate(0, -1) - final val LEFT = Coordinate(-1, 0) - final val RIGHT = Coordinate(1, 0) - - case class Coordinate(row: Int, column: Int) { - def addOffset(coordinate: Coordinate): Coordinate = { - Coordinate(this.row + coordinate.row, this.column + coordinate.column) - } - } -}
