http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
new file mode 100644
index 0000000..a488c9f
--- /dev/null
+++ 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.twitter.algebird.AveragedValue
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.state.api.PersistentTask
+import 
org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig, WindowConfig}
+import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+
+class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with 
Matchers {
+  property("WindowAverageProcessor should update state") {
+
+    implicit val system = ActorSystem("test")
+    val longGen = Gen.chooseNum[Long](1, 1000)
+    forAll(longGen, longGen) {
+      (data: Long, num: Long) =>
+        val taskContext = MockUtil.mockTaskContext
+
+        val windowSize = num
+        val windowStep = num
+
+        val conf = UserConfig.empty
+          .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+          .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
+          
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+            new InMemoryCheckpointStoreFactory)
+          .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
+
+        val windowAverage = new WindowAverageProcessor(taskContext, conf)
+
+        val appMaster = TestProbe()(system)
+        when(taskContext.appMaster).thenReturn(appMaster.ref)
+
+        windowAverage.onStart(StartTime(0L))
+        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
+
+        for (i <- 0L until num) {
+          windowAverage.onNext(Message("" + data, i))
+          windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
+        }
+
+        // Next checkpoint time is not arrived yet
+        when(taskContext.upstreamMinClock).thenReturn(0L)
+        windowAverage.onNext(PersistentTask.CHECKPOINT)
+        appMaster.expectNoMsg(10.milliseconds)
+
+        // Time to checkpoint
+        when(taskContext.upstreamMinClock).thenReturn(num)
+        windowAverage.onNext(PersistentTask.CHECKPOINT)
+        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
+    }
+
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/README.md 
b/examples/streaming/stockcrawler/README.md
index 4aea13d..b51590f 100644
--- a/examples/streaming/stockcrawler/README.md
+++ b/examples/streaming/stockcrawler/README.md
@@ -6,12 +6,12 @@ How to use
   ```
 2. Submit the stock crawler
   ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar 
io.gearpump.streaming.examples.stock.main.Stock
+  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar 
org.apache.gearpump.streaming.examples.stock.main.Stock
   ```
   
   If you are behind  a proxy, you need to set the proxy address
   ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar 
io.gearpump.streaming.examples.stock.main.Stock -proxy host:port
+  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar 
org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port
   ```
   
 3. Check the UI

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf 
b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
index f557d66..acee3bd 100644
--- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
+++ b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
@@ -1,6 +1,6 @@
 gearpump {
   serializers {
-    "io.gearpump.streaming.examples.stock.StockPrice" = ""
+    "org.apache.gearpump.streaming.examples.stock.StockPrice" = ""
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
deleted file mode 100644
index 1aea563..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock
-
-import scala.collection.immutable
-
-import akka.actor.Actor.Receive
-import org.joda.time.DateTime
-import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
-import io.gearpump.streaming.examples.stock.Price._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.util.LogUtil
-
-/**
- * Dradown analyzer
- * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
- */
-class Analyzer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-
-  val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-
-  private var stocksToReport = immutable.Set.empty[String]
-  private var stockInfos = new immutable.HashMap[String, StockPrice]
-
-  private var currentDownwardsStates = new immutable.HashMap[String, 
StockPriceState]
-  private val historicalStates = new HistoricalStates()
-  private var latestTimeStamp: Long = 0L
-
-  override def onStart(startTime: StartTime): Unit = {
-    LOG.info("analyzer is started")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case stock: StockPrice =>
-        latestTimeStamp = stock.timestamp
-        checkDate(stock)
-        stockInfos += stock.stockId -> stock
-        val downwardsState = updateCurrentStates(stock)
-        val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case get@GetReport(stockId, date) =>
-      var currentMax = currentDownwardsStates.get(stockId)
-
-      val dateTime = Option(date) match {
-        case Some(date) =>
-          currentMax = None
-          parseDate(dateFormatter, date)
-        case None =>
-          new DateTime(latestTimeStamp).withTimeAtStartOfDay
-      }
-
-      val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, 
_))
-      val name = stockInfos.get(stockId).map(_.name).getOrElse("")
-      sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
-  }
-
-  private def updateCurrentStates(stock: StockPrice) = {
-    var downwardsState: StockPriceState = null
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      downwardsState = generateNewState(stock, 
currentDownwardsStates.get(stock.stockId).get)
-    } else {
-      downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
-    }
-    currentDownwardsStates += stock.stockId -> downwardsState
-    downwardsState
-  }
-
-  // Update the stock's latest state.
-  private def generateNewState(currentPrice: Price, oldState: 
StockPriceState): StockPriceState = {
-    if (currentPrice.price > oldState.max.price) {
-      StockPriceState(oldState.stockID, currentPrice, currentPrice, 
currentPrice)
-    } else {
-      val newState = StockPriceState(oldState.stockID, oldState.max,
-        Price.min(currentPrice, oldState.min), currentPrice)
-      newState
-    }
-  }
-
-  private def checkDate(stock: StockPrice) = {
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      val now = new DateTime(stock.timestamp)
-      val lastTime = new 
DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
-      // New day
-      if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > 
lastTime.getYear) {
-        currentDownwardsStates -= stock.stockId
-      }
-    }
-  }
-
-  private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
-    format.parseDateTime(input)
-  }
-
-  private def handleHistoricalQuery(stockId: String, date: DateTime) = {
-    val maximal = historicalStates.getHistoricalMaximal(stockId, date)
-    maximal
-  }
-}
-
-object Analyzer {
-
-  class HistoricalStates {
-    val LOG = LogUtil.getLogger(getClass)
-    val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-    private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), 
StockPriceState]
-    private var historicalMaxDrawdown = new immutable.HashMap[(String, 
DateTime), StockPriceState]
-
-    def updatePresentMaximal(newState: StockPriceState): 
Option[StockPriceState] = {
-      val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
-      var newMaximalState: Option[StockPriceState] = null
-      if (newState.max.price < Float.MinPositiveValue) {
-        newMaximalState = generateNewMaximal(newState, date, 
historicalMaxRaise)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
-        }
-      } else {
-        newMaximalState = generateNewMaximal(newState, date, 
historicalMaxDrawdown)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxDrawdown += (newState.stockID, date) -> 
newMaximalState.get
-        }
-      }
-      newMaximalState
-    }
-
-    def getHistoricalMaximal(stockId: String, date: DateTime): 
Option[StockPriceState] = {
-      historicalMaxDrawdown.get((stockId, date))
-    }
-
-    private def generateNewMaximal(
-        state: StockPriceState,
-        date: DateTime,
-        map: immutable.HashMap[(String, DateTime), StockPriceState])
-      : Option[StockPriceState] = {
-      val maximal = map.get((state.stockID, date))
-      if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
-        None
-      } else {
-        Some(state)
-      }
-    }
-  }
-
-  def getDateFromTimeStamp(timestamp: Long): DateTime = {
-    new DateTime(timestamp).withTimeAtStartOfDay()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
deleted file mode 100644
index da5ab63..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock
-
-import scala.concurrent.duration._
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Crawler(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-
-  import taskContext._
-
-  val FetchStockPrice = Message("FetchStockPrice")
-
-  lazy val stocks = {
-    val stockIds = conf.getValue[Array[String]]("StockId").get
-    val size = if (stockIds.length % parallelism > 0) {
-      stockIds.length / parallelism + 1
-    } else {
-      stockIds.length / parallelism
-    }
-
-    val start = taskId.index * size
-    val end = (taskId.index + 1) * size
-    stockIds.slice(start, end)
-  }
-
-  scheduleOnce(1.seconds)(self ! FetchStockPrice)
-
-  val stockMarket = 
conf.getValue[StockMarket](classOf[StockMarket].getName).get
-
-  override def onStart(startTime: StartTime): Unit = {
-    // Nothing
-  }
-
-  override def onNext(msg: Message): Unit = {
-    stockMarket.getPrice(stocks).foreach { price =>
-      output(new Message(price, price.timestamp))
-    }
-    scheduleOnce(5.seconds)(self ! FetchStockPrice)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
deleted file mode 100644
index 5525524..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock
-
-// scalastyle:off equals.hash.code  case class has equals defined
-case class StockPrice(
-    stockId: String, name: String, price: String, delta: String, pecent: 
String, volume: String,
-    money: String, timestamp: Long) {
-  override def hashCode: Int = stockId.hashCode
-}
-// scalastyle:on equals.hash.code  case class has equals defined
-
-case class Price(price: Float, timestamp: Long)
-
-object Price {
-
-  import scala.language.implicitConversions
-
-  implicit def StockPriceToPrice(stock: StockPrice): Price = {
-    Price(stock.price.toFloat, stock.timestamp)
-  }
-
-  def min(first: Price, second: Price): Price = {
-    if (first.price < second.price) {
-      first
-    } else {
-      second
-    }
-  }
-}
-
-case class StockPriceState(stockID: String, max: Price, min: Price, current: 
Price) {
-
-  def drawDownPeriod: Long = min.timestamp - max.timestamp
-
-  def recoveryPeriod: Long = current.timestamp - min.timestamp
-
-  def drawDown: Float = max.price - min.price
-}
-
-case class GetReport(stockId: String, date: String)
-
-case class Report(
-    stockId: String, name: String, date: String, historyMax: 
Option[StockPriceState],
-    currentMax: Option[StockPriceState])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
deleted file mode 100644
index 33db128..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import io.gearpump.Message
-import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.appmaster.{ProcessorSummary, 
StreamAppMasterSummary}
-import io.gearpump.streaming.examples.stock.QueryServer.WebServer
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  import taskContext.{appId, appMaster}
-
-  var analyzer: (ProcessorId, ProcessorSummary) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
-  override def onStart(startTime: StartTime): Unit = {
-    appMaster ! AppMasterDataDetailRequest(appId)
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-    // Skip
-  }
-
-  override def receiveUnManagedMessage: Receive = messageHandler
-
-  def messageHandler: Receive = {
-    case detail: StreamAppMasterSummary =>
-      analyzer = detail.processors.find { kv =>
-        val (processorId, processor) = kv
-        processor.taskClass == classOf[Analyzer].getName
-      }.get
-    case getReport@GetReport(stockId, date) =>
-      val parallism = analyzer._2.parallelism
-      val processorId = analyzer._1
-      val analyzerTaskId = TaskId(processorId, (stockId.hashCode & 
Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      import scala.concurrent.Future
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-
-        (task.task ? getReport).asInstanceOf[Future[Report]]
-      }.map { report =>
-        LOG.info(s"reporting $report")
-        requester ! report
-      }
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("report" / Segment) { stockId =>
-        get {
-          onComplete((context.parent ? GetReport(stockId, 
null)).asInstanceOf[Future[Report]]) {
-            case Success(report: Report) =>
-              val json = write(report)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("stock/stock.html")
-      } ~
-      pathPrefix("css") {
-        get {
-          getFromResourceDirectory("stock/css")
-        }
-      } ~
-      pathPrefix("js") {
-        get {
-          getFromResourceDirectory("stock/js")
-        }
-      }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
deleted file mode 100644
index 508c282..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock
-
-import java.nio.charset.Charset
-import scala.io.Codec
-
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.commons.httpclient.{HttpClient, 
MultiThreadedHttpConnectionManager}
-import org.htmlcleaner.{HtmlCleaner, TagNode}
-import org.joda.time.{DateTime, DateTimeZone}
-
-import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
-
-class StockMarket(service: ServiceHour, proxy: HostPort = null) extends 
Serializable {
-
-  private def LOG = LogUtil.getLogger(getClass)
-
-  @transient
-  private var connectionManager: MultiThreadedHttpConnectionManager = null
-
-  private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html";
-
-  private val stockPriceParser =
-    
"""^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
-
-  def shutdown(): Unit = {
-    Option(connectionManager).map(_.shutdown())
-  }
-
-  @transient
-  private var _client: HttpClient = null
-
-  private def client: HttpClient = {
-    _client = Option(_client).getOrElse {
-      val connectionManager = new MultiThreadedHttpConnectionManager()
-      val client = new HttpClient(connectionManager)
-      Option(proxy).map(host => 
client.getHostConfiguration().setProxy(host.host, host.port))
-      client
-    }
-    _client
-  }
-
-  def getPrice(stocks: Array[String]): Array[StockPrice] = {
-
-    LOG.info(s"getPrice 1")
-
-    val query = "http://hq.sinajs.cn/list="; + stocks.map("s_" + 
_).mkString(",")
-    if (service.inService) {
-
-      LOG.info(s"getPrice 2")
-
-      val get = new GetMethod(query)
-      client.executeMethod(get)
-      val current = System.currentTimeMillis()
-
-      val output = 
scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
-        new Codec(Charset forName "GBK")).getLines().flatMap { line =>
-        line match {
-          case stockPriceParser(stockId, name, price, delta, pecent, volume, 
money) =>
-            Some(StockPrice(stockId, name, price, delta, pecent, volume, 
money, current))
-          case _ =>
-            None
-        }
-      }.toArray
-
-      LOG.info(s"getPrice 3 ${output.length}")
-
-      output
-    } else {
-      Array.empty[StockPrice]
-    }
-  }
-
-  private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
-
-  def getStockIdList: Array[String] = {
-    val cleaner = new HtmlCleaner
-    val props = cleaner.getProperties
-
-    val get = new GetMethod(eastMoneyStockPage)
-    client.executeMethod(get)
-
-    val root = cleaner.clean(get.getResponseBodyAsStream)
-
-    val stockUrls = 
root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
-
-    val elements = root.getElementsByName("a", true)
-
-    val hrefs = (0 until stockUrls.length)
-      .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
-      .map { url =>
-        url match {
-          case urlPattern(code) => code
-          case _ => null
-        }
-      }.toArray
-    hrefs
-  }
-}
-
-object StockMarket {
-
-  class ServiceHour(all: Boolean) extends Serializable {
-
-    /**
-     * Morning openning: 9:30 am - 11:30 am
-     */
-    val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
-    val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
-
-    /**
-     * After noon openning: 13:00 pm - 15:00 pm
-     */
-    val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
-    val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
-
-    def inService: Boolean = {
-
-      if (all) {
-        true
-      } else {
-        val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
-        if (now >= morningStart && now <= morningEnd ||
-          now >= afternoonStart && now <= afternoonEnd) {
-          true
-        } else {
-          false
-        }
-      }
-    }
-
-    private def GMT8(time: DateTime): DateTime = {
-      time.withZone(DateTimeZone.UTC).plusHours(8)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
deleted file mode 100644
index 638dc4e..0000000
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.stock.main
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import io.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, 
StockMarket}
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Tracks the China's stock market index change */
-object Stock extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
-      required = false, defaultValue = Some(10)),
-    "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
-      required = false, defaultValue = Some(1)),
-    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 
127.0.0.1:8443",
-      required = false, defaultValue = Some("")))
-
-  def crawler(config: ParseResult)(implicit system: ActorSystem): 
StreamApplication = {
-    val crawler = Processor[Crawler](config.getInt("crawler"))
-    val analyzer = Processor[Analyzer](config.getInt("analyzer"))
-    val queryServer = Processor[QueryServer](1)
-
-    val proxySetting = config.getString("proxy")
-    val proxy = if (proxySetting.isEmpty) {
-      null
-    } else HostPort(proxySetting)
-    val stockMarket = new StockMarket(new ServiceHour(true), proxy)
-    val stocks = stockMarket.getStockIdList
-
-    // scalastyle:off println
-    Console.println(s"Successfully fetched stock id for ${stocks.length} 
stocks")
-    // scalastyle:on println
-
-    val userConfig = UserConfig.empty.withValue("StockId", stocks)
-      .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
-    val partitioner = new HashPartitioner
-
-    val p1 = crawler ~ partitioner ~> analyzer
-    val p2 = Node(queryServer)
-    val graph = Graph(p1, p2)
-    val app = StreamApplication("stock_direct_analyzer", graph, userConfig
-    )
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-
-    implicit val system = context.system
-
-    val app = crawler(config)
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
new file mode 100644
index 0000000..f6fdff2
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock
+
+import scala.collection.immutable
+
+import akka.actor.Actor.Receive
+import org.joda.time.DateTime
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
+import org.apache.gearpump.streaming.examples.stock.Price._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Dradown analyzer
+ * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
+ */
+class Analyzer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+
+  val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
+
+  private var stocksToReport = immutable.Set.empty[String]
+  private var stockInfos = new immutable.HashMap[String, StockPrice]
+
+  private var currentDownwardsStates = new immutable.HashMap[String, 
StockPriceState]
+  private val historicalStates = new HistoricalStates()
+  private var latestTimeStamp: Long = 0L
+
+  override def onStart(startTime: StartTime): Unit = {
+    LOG.info("analyzer is started")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    msg.msg match {
+      case stock: StockPrice =>
+        latestTimeStamp = stock.timestamp
+        checkDate(stock)
+        stockInfos += stock.stockId -> stock
+        val downwardsState = updateCurrentStates(stock)
+        val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
+    }
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case get@GetReport(stockId, date) =>
+      var currentMax = currentDownwardsStates.get(stockId)
+
+      val dateTime = Option(date) match {
+        case Some(date) =>
+          currentMax = None
+          parseDate(dateFormatter, date)
+        case None =>
+          new DateTime(latestTimeStamp).withTimeAtStartOfDay
+      }
+
+      val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, 
_))
+      val name = stockInfos.get(stockId).map(_.name).getOrElse("")
+      sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
+  }
+
+  private def updateCurrentStates(stock: StockPrice) = {
+    var downwardsState: StockPriceState = null
+    if (currentDownwardsStates.contains(stock.stockId)) {
+      downwardsState = generateNewState(stock, 
currentDownwardsStates.get(stock.stockId).get)
+    } else {
+      downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
+    }
+    currentDownwardsStates += stock.stockId -> downwardsState
+    downwardsState
+  }
+
+  // Update the stock's latest state.
+  private def generateNewState(currentPrice: Price, oldState: 
StockPriceState): StockPriceState = {
+    if (currentPrice.price > oldState.max.price) {
+      StockPriceState(oldState.stockID, currentPrice, currentPrice, 
currentPrice)
+    } else {
+      val newState = StockPriceState(oldState.stockID, oldState.max,
+        Price.min(currentPrice, oldState.min), currentPrice)
+      newState
+    }
+  }
+
+  private def checkDate(stock: StockPrice) = {
+    if (currentDownwardsStates.contains(stock.stockId)) {
+      val now = new DateTime(stock.timestamp)
+      val lastTime = new 
DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
+      // New day
+      if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > 
lastTime.getYear) {
+        currentDownwardsStates -= stock.stockId
+      }
+    }
+  }
+
+  private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
+    format.parseDateTime(input)
+  }
+
+  private def handleHistoricalQuery(stockId: String, date: DateTime) = {
+    val maximal = historicalStates.getHistoricalMaximal(stockId, date)
+    maximal
+  }
+}
+
+object Analyzer {
+
+  class HistoricalStates {
+    val LOG = LogUtil.getLogger(getClass)
+    val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
+    private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), 
StockPriceState]
+    private var historicalMaxDrawdown = new immutable.HashMap[(String, 
DateTime), StockPriceState]
+
+    def updatePresentMaximal(newState: StockPriceState): 
Option[StockPriceState] = {
+      val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
+      var newMaximalState: Option[StockPriceState] = null
+      if (newState.max.price < Float.MinPositiveValue) {
+        newMaximalState = generateNewMaximal(newState, date, 
historicalMaxRaise)
+        if (newMaximalState.nonEmpty) {
+          historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
+        }
+      } else {
+        newMaximalState = generateNewMaximal(newState, date, 
historicalMaxDrawdown)
+        if (newMaximalState.nonEmpty) {
+          historicalMaxDrawdown += (newState.stockID, date) -> 
newMaximalState.get
+        }
+      }
+      newMaximalState
+    }
+
+    def getHistoricalMaximal(stockId: String, date: DateTime): 
Option[StockPriceState] = {
+      historicalMaxDrawdown.get((stockId, date))
+    }
+
+    private def generateNewMaximal(
+        state: StockPriceState,
+        date: DateTime,
+        map: immutable.HashMap[(String, DateTime), StockPriceState])
+      : Option[StockPriceState] = {
+      val maximal = map.get((state.stockID, date))
+      if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
+        None
+      } else {
+        Some(state)
+      }
+    }
+  }
+
+  def getDateFromTimeStamp(timestamp: Long): DateTime = {
+    new DateTime(timestamp).withTimeAtStartOfDay()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
new file mode 100644
index 0000000..bb444dd
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Crawler(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+
+  import taskContext._
+
+  val FetchStockPrice = Message("FetchStockPrice")
+
+  lazy val stocks = {
+    val stockIds = conf.getValue[Array[String]]("StockId").get
+    val size = if (stockIds.length % parallelism > 0) {
+      stockIds.length / parallelism + 1
+    } else {
+      stockIds.length / parallelism
+    }
+
+    val start = taskId.index * size
+    val end = (taskId.index + 1) * size
+    stockIds.slice(start, end)
+  }
+
+  scheduleOnce(1.seconds)(self ! FetchStockPrice)
+
+  val stockMarket = 
conf.getValue[StockMarket](classOf[StockMarket].getName).get
+
+  override def onStart(startTime: StartTime): Unit = {
+    // Nothing
+  }
+
+  override def onNext(msg: Message): Unit = {
+    stockMarket.getPrice(stocks).foreach { price =>
+      output(new Message(price, price.timestamp))
+    }
+    scheduleOnce(5.seconds)(self ! FetchStockPrice)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
new file mode 100644
index 0000000..94a85ff
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock
+
+// scalastyle:off equals.hash.code  case class has equals defined
+case class StockPrice(
+    stockId: String, name: String, price: String, delta: String, pecent: 
String, volume: String,
+    money: String, timestamp: Long) {
+  override def hashCode: Int = stockId.hashCode
+}
+// scalastyle:on equals.hash.code  case class has equals defined
+
+case class Price(price: Float, timestamp: Long)
+
+object Price {
+
+  import scala.language.implicitConversions
+
+  implicit def StockPriceToPrice(stock: StockPrice): Price = {
+    Price(stock.price.toFloat, stock.timestamp)
+  }
+
+  def min(first: Price, second: Price): Price = {
+    if (first.price < second.price) {
+      first
+    } else {
+      second
+    }
+  }
+}
+
+case class StockPriceState(stockID: String, max: Price, min: Price, current: 
Price) {
+
+  def drawDownPeriod: Long = min.timestamp - max.timestamp
+
+  def recoveryPeriod: Long = current.timestamp - min.timestamp
+
+  def drawDown: Float = max.price - min.price
+}
+
+case class GetReport(stockId: String, date: String)
+
+case class Report(
+    stockId: String, name: String, date: String, historyMax: 
Option[StockPriceState],
+    currentMax: Option[StockPriceState])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
new file mode 100644
index 0000000..01ccb3e
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+import akka.actor.Actor._
+import akka.actor.{Actor, ActorRefFactory, Props}
+import akka.io.IO
+import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, 
StreamAppMasterSummary}
+import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
+
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  import taskContext.{appId, appMaster}
+
+  var analyzer: (ProcessorId, ProcessorSummary) = null
+  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+
+  override def onStart(startTime: StartTime): Unit = {
+    appMaster ! AppMasterDataDetailRequest(appId)
+    taskContext.actorOf(Props(new WebServer))
+  }
+
+  override def onNext(msg: Message): Unit = {
+    // Skip
+  }
+
+  override def receiveUnManagedMessage: Receive = messageHandler
+
+  def messageHandler: Receive = {
+    case detail: StreamAppMasterSummary =>
+      analyzer = detail.processors.find { kv =>
+        val (processorId, processor) = kv
+        processor.taskClass == classOf[Analyzer].getName
+      }.get
+    case getReport@GetReport(stockId, date) =>
+      val parallism = analyzer._2.parallelism
+      val processorId = analyzer._1
+      val analyzerTaskId = TaskId(processorId, (stockId.hashCode & 
Integer.MAX_VALUE) % parallism)
+      val requester = sender
+      import scala.concurrent.Future
+      (appMaster ? LookupTaskActorRef(analyzerTaskId))
+        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
+
+        (task.task ? getReport).asInstanceOf[Future[Report]]
+      }.map { report =>
+        LOG.info(s"reporting $report")
+        requester ! report
+      }
+    case _ =>
+    // Ignore
+  }
+}
+
+object QueryServer {
+  class WebServer extends Actor with HttpService {
+
+    import context.dispatcher
+    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+    def actorRefFactory: ActorRefFactory = context
+    implicit val system = context.system
+
+    IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
+
+    override def receive: Receive = runRoute(webServer ~ staticRoute)
+
+    def webServer: Route = {
+      path("report" / Segment) { stockId =>
+        get {
+          onComplete((context.parent ? GetReport(stockId, 
null)).asInstanceOf[Future[Report]]) {
+            case Success(report: Report) =>
+              val json = write(report)
+              complete(pretty(json))
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
+          }
+        }
+      }
+    }
+
+    val staticRoute = {
+      pathEndOrSingleSlash {
+        getFromResource("stock/stock.html")
+      } ~
+      pathPrefix("css") {
+        get {
+          getFromResourceDirectory("stock/css")
+        }
+      } ~
+      pathPrefix("js") {
+        get {
+          getFromResourceDirectory("stock/js")
+        }
+      }
+    }
+
+    private def pretty(json: String): String = {
+      json.parseJson.prettyPrint
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
new file mode 100644
index 0000000..24e050b
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock
+
+import java.nio.charset.Charset
+import scala.io.Codec
+
+import org.apache.commons.httpclient.methods.GetMethod
+import org.apache.commons.httpclient.{HttpClient, 
MultiThreadedHttpConnectionManager}
+import org.htmlcleaner.{HtmlCleaner, TagNode}
+import org.joda.time.{DateTime, DateTimeZone}
+
+import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.LogUtil
+
+class StockMarket(service: ServiceHour, proxy: HostPort = null) extends 
Serializable {
+
+  private def LOG = LogUtil.getLogger(getClass)
+
+  @transient
+  private var connectionManager: MultiThreadedHttpConnectionManager = null
+
+  private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html";
+
+  private val stockPriceParser =
+    
"""^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
+
+  def shutdown(): Unit = {
+    Option(connectionManager).map(_.shutdown())
+  }
+
+  @transient
+  private var _client: HttpClient = null
+
+  private def client: HttpClient = {
+    _client = Option(_client).getOrElse {
+      val connectionManager = new MultiThreadedHttpConnectionManager()
+      val client = new HttpClient(connectionManager)
+      Option(proxy).map(host => 
client.getHostConfiguration().setProxy(host.host, host.port))
+      client
+    }
+    _client
+  }
+
+  def getPrice(stocks: Array[String]): Array[StockPrice] = {
+
+    LOG.info(s"getPrice 1")
+
+    val query = "http://hq.sinajs.cn/list="; + stocks.map("s_" + 
_).mkString(",")
+    if (service.inService) {
+
+      LOG.info(s"getPrice 2")
+
+      val get = new GetMethod(query)
+      client.executeMethod(get)
+      val current = System.currentTimeMillis()
+
+      val output = 
scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
+        new Codec(Charset forName "GBK")).getLines().flatMap { line =>
+        line match {
+          case stockPriceParser(stockId, name, price, delta, pecent, volume, 
money) =>
+            Some(StockPrice(stockId, name, price, delta, pecent, volume, 
money, current))
+          case _ =>
+            None
+        }
+      }.toArray
+
+      LOG.info(s"getPrice 3 ${output.length}")
+
+      output
+    } else {
+      Array.empty[StockPrice]
+    }
+  }
+
+  private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
+
+  def getStockIdList: Array[String] = {
+    val cleaner = new HtmlCleaner
+    val props = cleaner.getProperties
+
+    val get = new GetMethod(eastMoneyStockPage)
+    client.executeMethod(get)
+
+    val root = cleaner.clean(get.getResponseBodyAsStream)
+
+    val stockUrls = 
root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
+
+    val elements = root.getElementsByName("a", true)
+
+    val hrefs = (0 until stockUrls.length)
+      .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
+      .map { url =>
+        url match {
+          case urlPattern(code) => code
+          case _ => null
+        }
+      }.toArray
+    hrefs
+  }
+}
+
+object StockMarket {
+
+  class ServiceHour(all: Boolean) extends Serializable {
+
+    /**
+     * Morning openning: 9:30 am - 11:30 am
+     */
+    val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
+    val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
+
+    /**
+     * After noon openning: 13:00 pm - 15:00 pm
+     */
+    val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
+    val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
+
+    def inService: Boolean = {
+
+      if (all) {
+        true
+      } else {
+        val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
+        if (now >= morningStart && now <= morningEnd ||
+          now >= afternoonStart && now <= afternoonEnd) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    private def GMT8(time: DateTime): DateTime = {
+      time.withZone(DateTimeZone.UTC).plusHours(8)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
new file mode 100644
index 0000000..6d17c20
--- /dev/null
+++ 
b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.stock.main
+
+import akka.actor.ActorSystem
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
+import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, 
QueryServer, StockMarket}
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+/** Tracks the China's stock market index change */
+object Stock extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
+      required = false, defaultValue = Some(10)),
+    "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
+      required = false, defaultValue = Some(1)),
+    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 
127.0.0.1:8443",
+      required = false, defaultValue = Some("")))
+
+  def crawler(config: ParseResult)(implicit system: ActorSystem): 
StreamApplication = {
+    val crawler = Processor[Crawler](config.getInt("crawler"))
+    val analyzer = Processor[Analyzer](config.getInt("analyzer"))
+    val queryServer = Processor[QueryServer](1)
+
+    val proxySetting = config.getString("proxy")
+    val proxy = if (proxySetting.isEmpty) {
+      null
+    } else HostPort(proxySetting)
+    val stockMarket = new StockMarket(new ServiceHour(true), proxy)
+    val stocks = stockMarket.getStockIdList
+
+    // scalastyle:off println
+    Console.println(s"Successfully fetched stock id for ${stocks.length} 
stocks")
+    // scalastyle:on println
+
+    val userConfig = UserConfig.empty.withValue("StockId", stocks)
+      .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
+    val partitioner = new HashPartitioner
+
+    val p1 = crawler ~ partitioner ~> analyzer
+    val p2 = Node(queryServer)
+    val graph = Graph(p1, p2)
+    val app = StreamApplication("stock_direct_analyzer", graph, userConfig
+    )
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+
+    implicit val system = context.system
+
+    val app = crawler(config)
+    val appId = context.submit(app)
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf 
b/examples/streaming/transport/src/main/resources/geardefault.conf
index 165bce5..0c8f421 100644
--- a/examples/streaming/transport/src/main/resources/geardefault.conf
+++ b/examples/streaming/transport/src/main/resources/geardefault.conf
@@ -2,8 +2,8 @@ gearpump {
 
   serializers {
     ## Follow this format when adding new serializer for new message types
-    ##    "io.gearpump.Message" = "io.gearpump.streaming.MessageSerializer"
-    "io.gearpump.streaming.examples.transport.PassRecord" = ""
+    ##    "org.apache.gearpump.Message" = 
"org.apache.gearpump.streaming.MessageSerializer"
+    "org.apache.gearpump.streaming.examples.transport.PassRecord" = ""
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
deleted file mode 100644
index 788f92a..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-case class LocationInfo(id: String, row: Int, column: Int)
-
-// scalastyle:off equals.hash.code
-case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
-  override def hashCode: Int = vehicleId.hashCode
-}
-// scalastyle:on equals.hash.code
-
-case class GetTrace(vehicleId: String)
-
-case class VehicleTrace(records: Array[PassRecord])
-
-case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, 
locationId: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
deleted file mode 100644
index 33d7b54..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import scala.concurrent.duration._
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.transport.generator.{MockCity, 
PassRecordGenerator}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import taskContext.{output, parallelism, scheduleOnce, taskId}
-  private val overdriveThreshold = 
conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / 
parallelism
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private val recordGenerators: Array[PassRecordGenerator] =
-    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, 
overdriveThreshold)
-
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start", System.currentTimeMillis())
-  }
-
-  override def onNext(msg: Message): Unit = {
-    recordGenerators.foreach(generator =>
-      output(Message(generator.getNextPassRecord(), 
System.currentTimeMillis())))
-    scheduleOnce(1.second)(self ! Message("continue", 
System.currentTimeMillis()))
-  }
-
-  private def getIdentifier(taskId: TaskId): String = {
-    // scalastyle:off non.ascii.character.disallowed
-    s"沪A${taskId.processorId}${taskId.index}"
-    // scalastyle:on non.ascii.character.disallowed
-  }
-}
-
-object DataSource {
-  final val VEHICLE_NUM = "vehicle.number"
-  final val MOCK_CITY_SIZE = "mock.city.size"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
deleted file mode 100644
index f9dbbde..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, 
WebServer}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, 
StreamApplication}
-import io.gearpump.util.Graph
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import system.dispatcher
-  import taskContext.appMaster
-
-  var inspector: (ProcessorId, ProcessorDescription) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private var overSpeedRecords = List.empty[OverSpeedReport]
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, 
PartitionerDescription]](
-      StreamApplication.DAG).get)
-    inspector = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[VelocityInspector].getName
-    }.get
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case getTrace@GetTrace(vehicleId: String) =>
-      val parallism = inspector._2.parallelism
-      val processorId = inspector._1
-      val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & 
Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-        (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
-      }.map { trace =>
-        LOG.info(s"reporting $trace")
-        requester ! trace
-      }
-    case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
-      LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
-      overSpeedRecords :+= record
-    case GetAllRecords =>
-      sender ! 
QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
-      overSpeedRecords = List.empty[OverSpeedReport]
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  object GetAllRecords
-
-  case class OverSpeedRecords(records: Array[OverSpeedReport])
-
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("trace" / Segment) { vehicleId =>
-        get {
-          onComplete((context.parent ? 
GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
-            case Success(trace: VehicleTrace) =>
-              val json = write(trace)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      } ~
-      path("records") {
-        get {
-          onComplete((context.parent ? 
GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
-            case Success(records: OverSpeedRecords) =>
-              val json = write(records)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("transport/transport.html")
-      } ~
-        pathPrefix("css") {
-          get {
-            getFromResourceDirectory("transport/css")
-          }
-        } ~
-        pathPrefix("svg") {
-          get {
-            getFromResourceDirectory("transport/svg")
-          }
-        } ~
-        pathPrefix("js") {
-          get {
-            getFromResourceDirectory("transport/js")
-          }
-        }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
deleted file mode 100644
index a795277..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** A city smart transportation streaming application */
-object Transport extends AkkaApp with ArgumentsParser {
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many task to generate data>", required = 
false,
-      defaultValue = Some(10)),
-    "inspector" -> CLIOption[Int]("<how many over speed inspector>", required 
= false,
-      defaultValue = Some(4)),
-    "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required 
= false,
-      defaultValue = Some(1000)),
-    "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", 
required = false,
-      defaultValue = Some(10)),
-    "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = 
false,
-      defaultValue = Some(60)))
-
-  def application(config: ParseResult): StreamApplication = {
-    val sourceNum = config.getInt("source")
-    val inspectorNum = config.getInt("inspector")
-    val vehicleNum = config.getInt("vehicle")
-    val citysize = config.getInt("citysize")
-    val threshold = config.getInt("threshold")
-    val source = Processor[DataSource](sourceNum)
-    val inspector = Processor[VelocityInspector](inspectorNum)
-    val queryServer = Processor[QueryServer](1)
-    val partitioner = new HashPartitioner
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, 
vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, citysize).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-    StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
-      Node(queryServer)), userConfig)
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    context.submit(application(config))
-    context.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
deleted file mode 100644
index b9be8d7..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
-import io.gearpump.util.Graph
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-
-  import system.dispatcher
-  import taskContext.appMaster
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
-  private val fakePlateThreshold = 
conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
-  private val overdriveThreshold = 
conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private var queryServerActor: ActorRef = null
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, 
PartitionerDescription]](
-      StreamApplication.DAG).get)
-    val queryServer = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[QueryServer].getName
-    }.get
-    val queryServerTaskId = TaskId(queryServer._1, 0)
-    (appMaster ? 
LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
-      .map { task =>
-        queryServerActor = task.task
-      }
-  }
-
-  import io.gearpump.streaming.examples.transport.VelocityInspector._
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case passRecord: PassRecord =>
-        val records = passRecords.getOrElse(passRecord.vehicleId, 
Queue.empty[PassRecord])
-        if (records.size > 0) {
-          val velocity = getVelocity(passRecord, records.last)
-          val formatted = "%.2f".format(velocity)
-          if (velocity > overdriveThreshold) {
-            if (velocity > fakePlateThreshold) {
-              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " 
+
-                s"the speed is $formatted km/h")
-            }
-            if (queryServerActor != null) {
-              queryServerActor ! OverSpeedReport(passRecord.vehicleId, 
formatted,
-                passRecord.timeStamp, passRecord.locationId)
-            }
-          }
-        }
-        passRecords.update(passRecord.vehicleId, 
records.enqueueFinite(passRecord, RECORDS_NUM))
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case GetTrace(vehicleId) =>
-      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
-      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
-  }
-
-  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): 
Float = {
-    val distanceInKm = getDistance(lastPassRecord.locationId, 
passRecord.locationId)
-    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat 
/ (1000 * 60 * 60)
-    distanceInKm / timeInHour
-  }
-
-  private def getDistance(location1: String, location2: String): Long = {
-    mockCity.getDistance(location1, location2)
-  }
-}
-
-object VelocityInspector {
-  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
-  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
-  final val RECORDS_NUM = 20
-
-  class FiniteQueue[T](q: Queue[T]) {
-    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
-      var result = q.enqueue(elem)
-      while (result.size > maxSize) {
-        result = result.dequeue._2
-      }
-      result
-    }
-  }
-
-  import scala.language.implicitConversions
-
-  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new 
FiniteQueue[T](q)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
 
b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
deleted file mode 100644
index ff78679..0000000
--- 
a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import io.gearpump.streaming.examples.transport.generator.MockCity._
-
-class MockCity(size: Int) {
-  private val random = new Random()
-  private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
-  def nextLocation(currentLocationId: String): String = {
-    val coordinate = idToCoordinate(currentLocationId)
-    val direction = directions(random.nextInt(4))
-    val newCoordinate = coordinate.addOffset(direction)
-    if (inCity(newCoordinate)) {
-      coordinateToId(newCoordinate)
-    } else {
-      nextLocation(currentLocationId)
-    }
-  }
-
-  def getDistance(locationId1: String, locationId2: String): Long = {
-    val coordinate1 = idToCoordinate(locationId1)
-    val coordinate2 = idToCoordinate(locationId2)
-    val blocks = Math.abs(coordinate1.row - coordinate2.row) +
-      Math.abs(coordinate1.column - coordinate2.column)
-    blocks * LENGTH_PER_BLOCK
-  }
-
-  def randomLocationId(): String = {
-    val row = random.nextInt(size)
-    val column = random.nextInt(size)
-    coordinateToId(Coordinate(row, column))
-  }
-
-  private def coordinateToId(coordinate: Coordinate): String = {
-    s"Id_${coordinate.row}_${coordinate.column}"
-  }
-
-  private def idToCoordinate(locationId: String): Coordinate = {
-    val attr = locationId.split("_")
-    val row = attr(1).toInt
-    val column = attr(2).toInt
-    Coordinate(row, column)
-  }
-
-  private def inCity(coordinate: Coordinate): Boolean = {
-    coordinate.row >= 0 &&
-      coordinate.row < size &&
-      coordinate.column >= 0 &&
-      coordinate.column < size
-  }
-}
-
-object MockCity {
-  // The length of the mock city, km
-  final val LENGTH_PER_BLOCK = 5
-  // The minimal speed, km/h
-  final val MINIMAL_SPEED = 10
-
-  final val UP = Coordinate(0, 1)
-  final val DOWN = Coordinate(0, -1)
-  final val LEFT = Coordinate(-1, 0)
-  final val RIGHT = Coordinate(1, 0)
-
-  case class Coordinate(row: Int, column: Int) {
-    def addOffset(coordinate: Coordinate): Coordinate = {
-      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
-    }
-  }
-}

Reply via email to