http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index 22c36ef..de1054f 100644
--- 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,44 +19,47 @@
 package io.gearpump.streaming.examples.sol
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.Cancellable
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-import scala.concurrent.duration.FiniteDuration
-
-class SOLStreamProcessor(taskContext : TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
   import taskContext.output
 
   val taskConf = taskContext
 
-  private var msgCount : Long = 0
-  private var scheduler : Cancellable = null
-  private var snapShotWordCount : Long = 0
-  private var snapShotTime : Long = 0
+  private var msgCount: Long = 0
+  private var scheduler: Cancellable = null
+  private var snapShotWordCount: Long = 0
+  private var snapShotTime: Long = 0
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
     snapShotTime = System.currentTimeMillis()
   }
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     output(msg)
     msgCount = msgCount + 1
   }
 
-  override def onStop() : Unit = {
+  override def onStop(): Unit = {
     if (scheduler != null) {
       scheduler.cancel()
     }
   }
 
-  def reportWordCount() : Unit = {
-    val current : Long = System.currentTimeMillis()
-    LOG.info(s"Task ${taskConf.taskId} Throughput: ${(msgCount - 
snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
+  def reportWordCount(): Unit = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task ${taskConf.taskId} " +
+      s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) 
/ 1000)} " +
+      s"(words, second)")
     snapShotWordCount = msgCount
     snapShotTime = current
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 7f285f2..5c0f3be 100644
--- 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,22 +20,23 @@ package io.gearpump.streaming.examples.sol
 
 import java.util.Random
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.examples.sol.SOLStreamProducer._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class SOLStreamProducer(taskContext : TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
 
   import taskContext.output
 
   private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE)
-      .getOrElse(DEFAULT_MESSAGE_SIZE)
-  private var messages : Array[String] = null
-  private var rand : Random = null
-  private var messageCount : Long = 0
+    .getOrElse(DEFAULT_MESSAGE_SIZE)
+  private var messages: Array[String] = null
+  private var rand: Random = null
+  private var messageCount: Long = 0
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
     prepareRandomMessage
     self ! Start
   }
@@ -47,16 +48,16 @@ class SOLStreamProducer(taskContext : TaskContext, conf: 
UserConfig) extends Tas
 
     0.until(differentMessages).map { index =>
       val sb = new StringBuilder(sizeInBytes)
-      //Even though java encodes strings in UCS2, the serialized version sent 
by the tuples
+      // Even though java encodes strings in UCS2, the serialized version sent 
by the tuples
       // is UTF8, so it should be a single byte
-      0.until(sizeInBytes).foldLeft(sb){(sb, j) =>
-        sb.append(rand.nextInt(9));
+      0.until(sizeInBytes).foldLeft(sb) { (sb, j) =>
+        sb.append(rand.nextInt(9))
       }
-      messages(index) = sb.toString();
+      messages(index) = sb.toString()
     }
   }
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     val message = messages(rand.nextInt(messages.length))
     output(new Message(message, System.currentTimeMillis()))
     messageCount = messageCount + 1L
@@ -64,13 +65,14 @@ class SOLStreamProducer(taskContext : TaskContext, conf: 
UserConfig) extends Tas
   }
 
   // messageSourceMinClock represent the min clock of the message source
-  private def messageSourceMinClock : Message = {
+  private def messageSourceMinClock: Message = {
     Message("tick", System.currentTimeMillis())
   }
 }
 
 object SOLStreamProducer {
-  val DEFAULT_MESSAGE_SIZE = 100 // bytes
+  val DEFAULT_MESSAGE_SIZE = 100
+  // Bytes
   val BYTES_PER_MESSAGE = "bytesPerMessage"
   val Start = Message("start")
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
index 1f3dbbd..6e266d0 100644
--- 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
+++ 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,19 @@
 
 package io.gearpump.streaming.examples.sol
 
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
 
-import scala.concurrent.Future
-import scala.util.Success
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 
-class SOLSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfterAll with MasterHarness {
+class SOLSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll 
with MasterHarness {
   override def beforeAll {
     startActorSystem()
   }
@@ -36,7 +39,7 @@ class SOLSpec extends PropSpec with PropertyChecks with 
Matchers with BeforeAndA
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   property("SOL should succeed to submit application with required arguments") 
{
     val requiredArgs = Array.empty[String]
@@ -56,11 +59,12 @@ class SOLSpec extends PropSpec with PropertyChecks with 
Matchers with BeforeAndA
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
       val args = requiredArgs ++ optionalArgs
 
-      Future {SOL.main(masterConfig, args)}
+      Future {
+        SOL.main(masterConfig, args)
+      }
 
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
index acc7b8f..f5035b4 100644
--- 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,14 @@
  */
 package io.gearpump.streaming.examples.sol
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.{FlatSpec, Matchers}
 
-import scala.language.postfixOps
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
 
 class SOLStreamProcessorSpec extends FlatSpec with Matchers {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
index 4ff7d12..4bac30c 100644
--- 
a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ 
b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,15 @@
  */
 package io.gearpump.streaming.examples.sol
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
+
 class SOLStreamProducerSpec extends WordSpec with Matchers {
 
   "SOLStreamProducer" should {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
index e9ceb8b..3c6cde6 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,22 +19,24 @@
 package io.gearpump.streaming.examples.state
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.examples.state.processor.CountProcessor
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.hadoop.conf.Configuration
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.examples.state.processor.CountProcessor
+import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import io.gearpump.streaming.sink.DataSinkProcessor
+import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.state.impl.PersistentStateConfig
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph.Node
 import io.gearpump.util.{AkkaApp, Graph}
-import org.apache.hadoop.conf.Configuration
 
+/** Does exactly-once message count */
 object MessageCountApp extends AkkaApp with ArgumentsParser {
   val SOURCE_TASK = "sourceTask"
   val COUNT_TASK = "countTask"
@@ -46,21 +48,25 @@ object MessageCountApp extends AkkaApp with ArgumentsParser 
{
   val DEFAULT_FS = "defaultFS"
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = 
false, defaultValue = Some(1)),
+    SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = 
false,
+    defaultValue = Some(1)),
     COUNT_TASK -> CLIOption("<how many count tasks>", required = false, 
defaultValue = Some(1)),
-    SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = 
false, defaultValue = Some(1)),
+    SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = 
false,
+    defaultValue = Some(1)),
     SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
     SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
-    ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>", required = true),
+    ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>",
+    required = true),
     BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
-    DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. 
hdfs://localhost:9000>", required = true)
+    DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. 
hdfs://localhost:9000>",
+      required = true)
   )
 
-  def application(config: ParseResult)(implicit system: ActorSystem) : 
StreamApplication = {
+  def application(config: ParseResult)(implicit system: ActorSystem): 
StreamApplication = {
     val hadoopConfig = new Configuration
     hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
     val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
-      // rotate on 1KB
+      // Rotates on 1KB
       new FileSizeRotation(1000))
     val taskConfig = UserConfig.empty
       .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
@@ -77,7 +83,8 @@ object MessageCountApp extends AkkaApp with ArgumentsParser {
     val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
     val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
     val partitioner = new HashPartitioner()
-    val graph = Graph(sourceProcessor ~ partitioner ~> countProcessor ~ 
partitioner ~> sinkProcessor)
+    val graph = Graph(sourceProcessor ~ partitioner
+      ~> countProcessor ~ partitioner ~> sinkProcessor)
     val app = StreamApplication("MessageCount", graph, UserConfig.empty)
     app
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
index 7489808..6f3bc79 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,40 +19,45 @@
 package io.gearpump.streaming.examples.state
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.examples.state.processor.{WindowAverageProcessor, 
NumberGeneratorProcessor}
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig}
+import org.apache.hadoop.conf.Configuration
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
+import 
io.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, 
WindowAverageProcessor}
+import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import io.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph.Node
 import io.gearpump.util.{AkkaApp, Graph}
-import org.apache.hadoop.conf.Configuration
 
+/** Does exactly-once sliding window based average aggregation */
 object WindowAverageApp extends AkkaApp with ArgumentsParser {
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue 
= Some(1)),
     "window" -> CLIOption("<how mange window tasks", required = false, 
defaultValue = Some(1)),
-    "window_size" -> CLIOption("<window size in milliseconds>", required = 
false , defaultValue = Some(5000)),
-    "window_step" -> CLIOption("<window step in milliseconds>", required = 
false , defaultValue = Some(5000))
+    "window_size" -> CLIOption("<window size in milliseconds>", required = 
false,
+      defaultValue = Some(5000)),
+    "window_step" -> CLIOption("<window step in milliseconds>", required = 
false,
+      defaultValue = Some(5000))
   )
 
-  def application(config: ParseResult)(implicit system: ActorSystem) : 
StreamApplication = {
+  def application(config: ParseResult)(implicit system: ActorSystem): 
StreamApplication = {
     val windowSize = config.getInt("window_size")
     val windowStep = config.getInt("window_step")
     val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageCount", new Configuration)
-    val taskConfig = UserConfig.empty
-        .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-        .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
-        .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, 
checkpointStoreFactory)
-        .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
+    val taskConfig = UserConfig.empty.
+      withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, 
checkpointStoreFactory)
+      .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
     val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
     val count = Processor[WindowAverageProcessor](config.getInt("window"), 
taskConf = taskConfig)
     val partitioner = new HashPartitioner()
-    val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> 
count), UserConfig.empty)
+    val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> 
count),
+      UserConfig.empty)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
index 04cea69..6610b91 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,13 @@
 
 package io.gearpump.streaming.examples.state.processor
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.monoid.AlgebirdMonoid
 import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentTask, PersistentState}
+import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
 import io.gearpump.streaming.state.impl.NonWindowState
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.cluster.UserConfig
-import io.gearpump.Message
 
 class CountProcessor(taskContext: TaskContext, conf: UserConfig)
   extends PersistentTask[Int](taskContext, conf) {
@@ -39,4 +39,3 @@ class CountProcessor(taskContext: TaskContext, conf: 
UserConfig)
   }
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 0dae825..fa9854f 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.streaming.examples.state.processor
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class NumberGeneratorProcessor(taskContext : TaskContext, conf: UserConfig) 
extends Task(taskContext, conf) {
+class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
   import taskContext.output
 
   private var num = 0L

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
index 82d6243..e7ac9b3 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,25 @@
 
 package io.gearpump.streaming.examples.state.processor
 
+import scala.collection.immutable.TreeMap
+
 import com.twitter.algebird.{AveragedGroup, AveragedValue}
+import org.slf4j.Logger
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.monoid.AlgebirdGroup
 import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentTask, PersistentState}
-import io.gearpump.streaming.state.impl.{WindowConfig, WindowState, Interval, 
Window}
+import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
+import io.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, 
WindowState}
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.immutable.TreeMap
 
 object WindowAverageProcessor {
   val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor])
 }
 
-class WindowAverageProcessor(taskContext : TaskContext, conf: UserConfig)
+class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
   extends PersistentTask[AveragedValue](taskContext, conf) {
 
   override def persistentState: PersistentState[AveragedValue] = {
@@ -46,7 +47,7 @@ class WindowAverageProcessor(taskContext : TaskContext, conf: 
UserConfig)
   }
 
   override def processMessage(state: PersistentState[AveragedValue],
-                              message: Message): Unit = {
+      message: Message): Unit = {
     val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
     state.update(message.timestamp, value)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
index 8053f57..040c343 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,19 @@
 
 package io.gearpump.streaming.examples.state
 
-import io.gearpump.streaming.examples.state.MessageCountApp._
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
 import io.gearpump.cluster.ClientToMaster.SubmitApplication
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.{MasterHarness, TestUtil}
-import org.scalatest.{Matchers, BeforeAndAfter, PropSpec}
-import org.scalatest.prop.PropertyChecks
-
-
-import scala.concurrent.Future
-import scala.util.Success
+import io.gearpump.streaming.examples.state.MessageCountApp._
 
-class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers 
with BeforeAndAfter with MasterHarness {
+class MessageCountAppSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
 
   before {
     startActorSystem()
@@ -39,7 +40,7 @@ class MessageCountAppSpec extends PropSpec with 
PropertyChecks with Matchers wit
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   property("MessageCount should succeed to submit application with required 
arguments") {
     val requiredArgs = Array(
@@ -68,7 +69,9 @@ class MessageCountAppSpec extends PropSpec with 
PropertyChecks with Matchers wit
     val masterReceiver = createMockMaster()
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
       val args = requiredArgs ++ optionalArgs
-      Future {MessageCountApp.main(masterConfig, args)}
+      Future {
+        MessageCountApp.main(masterConfig, args)
+      }
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
index 998dd1d..7c1c798 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,20 @@
 
 package io.gearpump.streaming.examples.state
 
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
 import io.gearpump.cluster.ClientToMaster.SubmitApplication
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.{MasterHarness, TestUtil}
-import org.scalatest.{Matchers, BeforeAndAfter, PropSpec}
-import org.scalatest.prop.PropertyChecks
 
-import scala.concurrent.Future
-import scala.util.Success
+class WindowAverageAppSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
 
-class WindowAverageAppSpec extends PropSpec with PropertyChecks with Matchers 
with BeforeAndAfter with MasterHarness {
   before {
     startActorSystem()
   }
@@ -36,7 +40,7 @@ class WindowAverageAppSpec extends PropSpec with 
PropertyChecks with Matchers wi
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   property("WindowAverage should succeed to submit application with required 
arguments") {
     val requiredArgs = Array.empty[String]
@@ -61,7 +65,9 @@ class WindowAverageAppSpec extends PropSpec with 
PropertyChecks with Matchers wi
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
       val args = requiredArgs ++ optionalArgs
 
-      Future {WindowAverageApp.main(masterConfig, args)}
+      Future {
+        WindowAverageApp.main(masterConfig, args)
+      }
 
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index cbc9cce..a69116b 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,23 @@
 
 package io.gearpump.streaming.examples.state.processor
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.PersistentStateConfig
-import io.gearpump.streaming.task.ReportCheckpointClock
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.state.impl.InMemoryCheckpointStoreFactory
-import io.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
-import scala.concurrent.duration._
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.state.api.PersistentTask
+import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig}
+import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
 
@@ -51,7 +51,8 @@ class CountProcessorSpec extends PropSpec with PropertyChecks 
with Matchers {
         val conf = UserConfig.empty
           .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
           .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
-          
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
 new InMemoryCheckpointStoreFactory)
+          
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+            new InMemoryCheckpointStoreFactory)
 
         val count = new CountProcessor(taskContext, conf)
 
@@ -61,25 +62,23 @@ class CountProcessorSpec extends PropSpec with 
PropertyChecks with Matchers {
         count.onStart(StartTime(0L))
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
 
-
         for (i <- 0L to num) {
           count.onNext(Message("", i))
           count.state.get shouldBe Some(i + 1)
         }
-        // next checkpoint time is at num
-        // not yet
+        // Next checkpoint time is not arrived yet
         when(taskContext.upstreamMinClock).thenReturn(0L)
         count.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10 milliseconds)
+        appMaster.expectNoMsg(10.milliseconds)
 
-        // time to checkpoint
+        // Time to checkpoint
         when(taskContext.upstreamMinClock).thenReturn(num)
         count.onNext(PersistentTask.CHECKPOINT)
-        // only the state before checkpoint time is checkpointed
+        // Only the state before checkpoint time is checkpointed
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
     }
 
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index 8105c32..18a49ac 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,20 @@
 
 package io.gearpump.streaming.examples.state.processor
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import org.mockito.{Matchers => MockitoMatchers}
 import org.mockito.Mockito._
+import org.mockito.{Matchers => MockitoMatchers}
 import org.scalatest.{Matchers, WordSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
+
 class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
   "NumberGeneratorProcessor" should {
     "send random numbers" in {
@@ -38,7 +42,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with 
Matchers {
 
       val mockTaskActor = TestProbe()
 
-      //mock self ActorRef
+      // Mock self ActorRef
       when(taskContext.self).thenReturn(mockTaskActor.ref)
 
       val conf = UserConfig.empty
@@ -46,13 +50,12 @@ class NumberGeneratorProcessorSpec extends WordSpec with 
Matchers {
       genNum.onStart(StartTime(0))
       mockTaskActor.expectMsgType[Message]
 
-
       genNum.onNext(Message("next"))
       verify(taskContext).output(MockitoMatchers.any[Message])
-      //mockTaskActor.expectMsgType[Message]
+      // mockTaskActor.expectMsgType[Message]
 
-      system.shutdown()
-      system.awaitTermination()
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 03bc8fb..a9c52aa 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,25 +18,24 @@
 
 package io.gearpump.streaming.examples.state.processor
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import com.twitter.algebird.AveragedValue
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig}
-import io.gearpump.streaming.task.ReportCheckpointClock
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.state.impl.InMemoryCheckpointStoreFactory
-import io.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
-import scala.concurrent.duration._
-
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.state.api.PersistentTask
+import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig, WindowConfig}
+import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with 
Matchers {
   property("WindowAverageProcessor should update state") {
@@ -51,10 +50,11 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
         val windowStep = num
 
         val conf = UserConfig.empty
-            .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-            .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
-            
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
 new InMemoryCheckpointStoreFactory)
-            .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
+          .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+          .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
+          
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+            new InMemoryCheckpointStoreFactory)
+          .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
 
         val windowAverage = new WindowAverageProcessor(taskContext, conf)
 
@@ -69,19 +69,18 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
           windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
         }
 
-        // next checkpoint time is at num
-        // not yet
+        // Next checkpoint time is not arrived yet
         when(taskContext.upstreamMinClock).thenReturn(0L)
         windowAverage.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10 milliseconds)
+        appMaster.expectNoMsg(10.milliseconds)
 
-        // time to checkpoint
+        // Time to checkpoint
         when(taskContext.upstreamMinClock).thenReturn(num)
         windowAverage.onNext(PersistentTask.CHECKPOINT)
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
     }
 
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css 
b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
index 4693dc6..182d722 100644
--- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
+++ b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 .ui-datepicker {
   font-size: 11px;
-  }
+}
+
 .sidebar-label {
-  font-size:15px;
+  font-size: 15px;
   font-family: calibri, Arial, Helvetica, sans-serif;
 }
-          
+
 .help {
-  font-size:12px;
+  font-size: 12px;
   font-family: calibri, Arial, Helvetica, sans-serif;
 }
-          
+
 div.splitter {
   margin: 12px 0px 7px 0px;
   clear: both;
@@ -18,13 +37,13 @@ div.splitter {
 }
 
 input.sidebar {
-  width:165px
+  width: 165px
 }
 
 select.sidebar {
-  width:198px
+  width: 198px
 }
-          
+
 table.dataintable {
   font-family: calibri, Arial, Helvetica, sans-serif;
   font-size: 15px;
@@ -48,49 +67,49 @@ table.dataintable td {
   border: 1px solid #AAA;
 }
 
-#search{
-  width:100px;
-  height:25px;
-  position:relative;
-  left:0px;
-  top:5px;
+#search {
+  width: 100px;
+  height: 25px;
+  position: relative;
+  left: 0px;
+  top: 5px;
 }
 
-#mytable{
-  width:100%;
-  height:300;
-  float:left;
+#mytable {
+  width: 100%;
+  height: 300;
+  float: left;
 }
 
-#mychart{
-  height:250px;
-  width:100%;
+#mychart {
+  height: 250px;
+  width: 100%;
 }
 
-#Menu{
-  height:100%;
-  width:245px;
-  float:left;
+#Menu {
+  height: 100%;
+  width: 245px;
+  float: left;
 }
 
-#header{
-  height:115px;
+#header {
+  height: 115px;
   background-image: url(header.png);
 }
 
-#body{
-  height:100%;
-  width:100%;
+#body {
+  height: 100%;
+  width: 100%;
   background-image: url(body.png);
-  background-size:100% 100%;
+  background-size: 100% 100%;
 }
 
-#footer{
-  color:white;
-  height:70px;
-  line-height:70px;
-  text-align:middle;
-  clear:both;
-  text-align:center;
+#footer {
+  color: white;
+  height: 70px;
+  line-height: 70px;
+  text-align: middle;
+  clear: both;
+  text-align: center;
   background-image: url(foot.png);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js 
b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
index 4e5e5f6..97f1e07 100644
--- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
+++ b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
@@ -1,10 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 function initChart(chartid, tableid, stockId) {
   require.config({
     paths: {
-        echarts: 'http://echarts.baidu.com/build/dist'
+      echarts: 'http://echarts.baidu.com/build/dist'
     }
   });
-      
+
   require(
     [
       'echarts',
@@ -17,13 +35,13 @@ function initChart(chartid, tableid, stockId) {
       var dataPoints = 100;
       var timeTicket;
       clearInterval(timeTicket);
-      timeTicket = setInterval(function (){
-        $.getJSON( "report/" + stockId, function( json ) {
+      timeTicket = setInterval(function () {
+        $.getJSON("report/" + stockId, function (json) {
           STOCK_NAME = json.name
-          
+
           var maxDrawnDown = json.currentMax[0].max.price - 
json.currentMax[0].min.price;
-          var time = new 
Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/,'');
-           // 动态数据接口 addData
+          var time = new 
Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, 
'');
+          // 动态数据接口 addData
           myChart.addData([
             [
               0,        // 系列索引
@@ -40,91 +58,91 @@ function initChart(chartid, tableid, stockId) {
               time
             ]
           ]);
-          document.getElementById(chartid).style.display="block"
+          document.getElementById(chartid).style.display = "block"
           document.getElementById(tableid).innerHTML = "<pre>" + 
JSON.stringify(json, null, 2) + "</pre>"
-          });
+        });
       }, 2000);
 
       var subtext_ = "Draw Down"
 
       var option = {
-        title : {
-            text: 'Stock Analysis',
-            subtext: "Max " + subtext_
+        title: {
+          text: 'Stock Analysis',
+          subtext: "Max " + subtext_
         },
-        tooltip : {
-            trigger: 'axis'
+        tooltip: {
+          trigger: 'axis'
         },
         legend: {
-            data:["Current Price", "Current Draw Down"]
+          data: ["Current Price", "Current Draw Down"]
         },
         toolbox: {
-          show : false,
-          feature : {
-            mark : {show: true},
-            dataView : {show: true, readOnly: false},
-            magicType : {show: true, type: ['line', 'bar']},
-            restore : {show: true},
-            saveAsImage : {show: true}
+          show: false,
+          feature: {
+            mark: {show: true},
+            dataView: {show: true, readOnly: false},
+            magicType: {show: true, type: ['line', 'bar']},
+            restore: {show: true},
+            saveAsImage: {show: true}
           }
         },
-        dataZoom : {
-          show : false,
-          start : 0,
-          end : 100
+        dataZoom: {
+          show: false,
+          start: 0,
+          end: 100
         },
-        xAxis : [
+        xAxis: [
           {
-            type : 'category',
-            boundaryGap : true,
-            data : (function (){
+            type: 'category',
+            boundaryGap: true,
+            data: (function () {
               var now = new Date();
               var res = [];
               var len = dataPoints;
               while (len--) {
-                res.unshift(now.toLocaleTimeString().replace(/^\D*/,''));
+                res.unshift(now.toLocaleTimeString().replace(/^\D*/, ''));
                 now = new Date(now - 2000);
               }
               return res;
             })()
           }
         ],
-        yAxis : [
+        yAxis: [
           {
-            type : 'value',
+            type: 'value',
             scale: true,
-            name : subtext_ + ' 价格/元',
+            name: subtext_ + ' 价格/元',
             boundaryGap: [0, 0.3]
           },
           {
-            type : 'value',
+            type: 'value',
             scale: true,
-            name : 'Current 价格/元',
+            name: 'Current 价格/元',
             boundaryGap: [0, 0.1]
           }
         ],
-        series : [
+        series: [
           {
-            name:"Current Draw Down",
-            type:'line',
-            data:(function (){
+            name: "Current Draw Down",
+            type: 'line',
+            data: (function () {
               var res = [];
               var len = dataPoints;
               while (len--) {
-                  res.push(0);
+                res.push(0);
               }
               return res;
             })()
           },
           {
-            name:"Current Price",
-            type:'line',
+            name: "Current Price",
+            type: 'line',
             yAxisIndex: 1,
-            data:(function (){
+            data: (function () {
               var res = [];
               var len = dataPoints;
               while (len--) {
-                  res.push(0);
+                res.push(0);
               }
               return res;
             })()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html 
b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
index 224473f..9682a53 100644
--- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
+++ b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
@@ -1,66 +1,87 @@
 <!DOCTYPE html>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
 <html>
 
 <head>
-    <meta charset="utf-8">
-    <link rel=stylesheet type=text/css href="css/custom.css">
-    <script src="http://echarts.baidu.com/build/dist/echarts.js";></script>
-    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js";></script>
-    <script src="js/stock.js"></script>
-    <script type="text/javascript">        
-      function search_onclick(){
-        var stockId = document.getElementById('stockId').value
-        initChart("mychart", "mytable", stockId)
-      }
-    </script>
+  <meta charset="utf-8">
+  <link rel=stylesheet type=text/css href="css/custom.css">
+  <script src="http://echarts.baidu.com/build/dist/echarts.js";></script>
+  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js";></script>
+  <script src="js/stock.js"></script>
+  <script type="text/javascript">
+    function search_onclick() {
+      var stockId = document.getElementById('stockId').value
+      initChart("mychart", "mytable", stockId)
+    }
+  </script>
 </head>
 
 <body style="background-color:#F2F2F2">
 <div id="container" style="width:882px; 
height:450px;margin-left:auto;margin-right:auto;">
-    <div style="height:0px"></div>
-    <div id="header">
-        <div 
style="font-weight:600;position:relative;left:50px;top:50px;font-family: 
calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-            Big Data Stock Analysis Demo
-        </div>
+  <div style="height:0px"></div>
+  <div id="header">
+    <div
+      style="font-weight:600;position:relative;left:50px;top:50px;font-family: 
calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
+      Big Data Stock Analysis Demo
     </div>
-    <div id="body">
-        <div id="Menu">
-            <div style="position:relative;margin-left:30px; 
margin-right:20px;margin-top:20px;">
-                <!-- form to post to accompany to get accompanying cars -->
+  </div>
+  <div id="body">
+    <div id="Menu">
+      <div style="position:relative;margin-left:30px; 
margin-right:20px;margin-top:20px;">
+        <!-- form to post to accompany to get accompanying cars -->
 
-                <table style="width:100%">
-                    <tr>
-                        <td class="sidebar-label">Stock Id:</td>
-                    </tr>
-                    <tr>
-                        <td class="sidebar-label">Example: sh600019, 
sz000002</td>
-                    </tr>
-                    <tr>
-                        <td style="vertical-align:top;">
-                            <input id="stockId" class="sidebar" type="text" 
name="stockId"/>
-                        </td>
-                    </tr>
-                </table>
-                <div class="splitter"></div>
-                <div>
-                    <button id="search" 
onclick="search_onclick()">Search</button>
-                </div>
-            </div>
+        <table style="width:100%">
+          <tr>
+            <td class="sidebar-label">Stock Id:</td>
+          </tr>
+          <tr>
+            <td class="sidebar-label">Example: sh600019, sz000002</td>
+          </tr>
+          <tr>
+            <td style="vertical-align:top;">
+              <input id="stockId" class="sidebar" type="text" name="stockId"/>
+            </td>
+          </tr>
+        </table>
+        <div class="splitter"></div>
+        <div>
+          <button id="search" onclick="search_onclick()">Search</button>
         </div>
-        <div id="content" 
style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-            <div 
style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family:
 calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-                Analysis Result:
-            </div>
-            <div style="height:7px;background-color:#92BDF2;"></div>
+      </div>
+    </div>
+    <div id="content"
+         
style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
+      <div
+        
style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family:
 calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
+        Analysis Result:
+      </div>
+      <div style="height:7px;background-color:#92BDF2;"></div>
 
-            <div id="mychart"></div>
+      <div id="mychart"></div>
 
-            <div id="mytable"></div>
-        </div>
+      <div id="mytable"></div>
     </div>
-    <div id="footer">
-        Big Data Team @ Intel
-    </div>    
+  </div>
+  <div id="footer">
+    Big Data Team @ Intel
+  </div>
 </div>
 </body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
index a37ab7f..1aea563 100644
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
+++ 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,26 +16,26 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.examples.stock
 
+import scala.collection.immutable
+
 import akka.actor.Actor.Receive
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import Analyzer.HistoricalStates
-import Price._
-import io.gearpump.util.LogUtil
 import org.joda.time.DateTime
 import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
 
-import scala.collection.immutable
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
+import io.gearpump.streaming.examples.stock.Price._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import io.gearpump.util.LogUtil
 
 /**
  * Dradown analyzer
  * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
  */
-class Analyzer (taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+class Analyzer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
 
   val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
 
@@ -62,7 +62,7 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) 
extends Task(taskCon
   }
 
   override def receiveUnManagedMessage: Receive = {
-    case get@ GetReport(stockId, date) =>
+    case get@GetReport(stockId, date) =>
       var currentMax = currentDownwardsStates.get(stockId)
 
       val dateTime = Option(date) match {
@@ -77,7 +77,7 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) 
extends Task(taskCon
       val name = stockInfos.get(stockId).map(_.name).getOrElse("")
       sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
   }
-  
+
   private def updateCurrentStates(stock: StockPrice) = {
     var downwardsState: StockPriceState = null
     if (currentDownwardsStates.contains(stock.stockId)) {
@@ -88,32 +88,33 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) 
extends Task(taskCon
     currentDownwardsStates += stock.stockId -> downwardsState
     downwardsState
   }
-  
-  //Update the stock's latest state.
+
+  // Update the stock's latest state.
   private def generateNewState(currentPrice: Price, oldState: 
StockPriceState): StockPriceState = {
-    if(currentPrice.price > oldState.max.price) {
+    if (currentPrice.price > oldState.max.price) {
       StockPriceState(oldState.stockID, currentPrice, currentPrice, 
currentPrice)
     } else {
-      val newState = StockPriceState(oldState.stockID, oldState.max, 
Price.min(currentPrice, oldState.min), currentPrice)
+      val newState = StockPriceState(oldState.stockID, oldState.max,
+        Price.min(currentPrice, oldState.min), currentPrice)
       newState
     }
   }
-  
+
   private def checkDate(stock: StockPrice) = {
     if (currentDownwardsStates.contains(stock.stockId)) {
       val now = new DateTime(stock.timestamp)
       val lastTime = new 
DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
-      //New day
-      if(now.getDayOfYear > lastTime.getDayOfYear || now.getYear > 
lastTime.getYear) {
+      // New day
+      if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > 
lastTime.getYear) {
         currentDownwardsStates -= stock.stockId
       }
     }
   }
-  
-  private def parseDate(format: DateTimeFormatter, input: String): DateTime =  
{
+
+  private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
     format.parseDateTime(input)
   }
-  
+
   private def handleHistoricalQuery(stockId: String, date: DateTime) = {
     val maximal = historicalStates.getHistoricalMaximal(stockId, date)
     maximal
@@ -123,28 +124,28 @@ class Analyzer (taskContext: TaskContext, conf: 
UserConfig) extends Task(taskCon
 object Analyzer {
 
   class HistoricalStates {
-    val LOG =LogUtil.getLogger(getClass)
+    val LOG = LogUtil.getLogger(getClass)
     val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
     private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), 
StockPriceState]
     private var historicalMaxDrawdown = new immutable.HashMap[(String, 
DateTime), StockPriceState]
-    
+
     def updatePresentMaximal(newState: StockPriceState): 
Option[StockPriceState] = {
       val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
       var newMaximalState: Option[StockPriceState] = null
       if (newState.max.price < Float.MinPositiveValue) {
         newMaximalState = generateNewMaximal(newState, date, 
historicalMaxRaise)
-        if(newMaximalState.nonEmpty) {
-          historicalMaxRaise += (newState.stockID, date)-> newMaximalState.get
+        if (newMaximalState.nonEmpty) {
+          historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
         }
       } else {
         newMaximalState = generateNewMaximal(newState, date, 
historicalMaxDrawdown)
-        if(newMaximalState.nonEmpty) {
+        if (newMaximalState.nonEmpty) {
           historicalMaxDrawdown += (newState.stockID, date) -> 
newMaximalState.get
         }
       }
       newMaximalState
     }
-    
+
     def getHistoricalMaximal(stockId: String, date: DateTime): 
Option[StockPriceState] = {
       historicalMaxDrawdown.get((stockId, date))
     }
@@ -152,17 +153,17 @@ object Analyzer {
     private def generateNewMaximal(
         state: StockPriceState,
         date: DateTime,
-        map: immutable.HashMap[(String, DateTime), StockPriceState]):
-      Option[StockPriceState] = {
+        map: immutable.HashMap[(String, DateTime), StockPriceState])
+      : Option[StockPriceState] = {
       val maximal = map.get((state.stockID, date))
-      if(maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
+      if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
         None
       } else {
         Some(state)
       }
     }
   }
-  
+
   def getDateFromTimeStamp(timestamp: Long): DateTime = {
     new DateTime(timestamp).withTimeAtStartOfDay()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
index 189ddba..da5ab63 100644
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
+++ 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.examples.stock
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import scala.concurrent.duration._
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
-
-import scala.concurrent.duration._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
 class Crawler(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
 
@@ -48,12 +47,12 @@ class Crawler(taskContext: TaskContext, conf: UserConfig) 
extends Task(taskConte
 
   val stockMarket = 
conf.getValue[StockMarket](classOf[StockMarket].getName).get
 
-  override def onStart(startTime : StartTime) : Unit = {
-    //nothing
+  override def onStart(startTime: StartTime): Unit = {
+    // Nothing
   }
 
-  override def onNext(msg : Message) : Unit = {
-    stockMarket.getPrice(stocks).foreach {price =>
+  override def onNext(msg: Message): Unit = {
+    stockMarket.getPrice(stocks).foreach { price =>
       output(new Message(price, price.timestamp))
     }
     scheduleOnce(5.seconds)(self ! FetchStockPrice)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
index cb90a79..5525524 100644
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
+++ 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,26 +16,28 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.examples.stock
 
-import akka.actor.ActorRef
-
-case class StockPrice(stockId: String, name: String, price: String, delta: 
String, pecent: String, volume: String, money: String, timestamp: Long) {
+// scalastyle:off equals.hash.code  case class has equals defined
+case class StockPrice(
+    stockId: String, name: String, price: String, delta: String, pecent: 
String, volume: String,
+    money: String, timestamp: Long) {
   override def hashCode: Int = stockId.hashCode
 }
-
+// scalastyle:on equals.hash.code  case class has equals defined
 
 case class Price(price: Float, timestamp: Long)
 
 object Price {
 
+  import scala.language.implicitConversions
+
   implicit def StockPriceToPrice(stock: StockPrice): Price = {
     Price(stock.price.toFloat, stock.timestamp)
   }
 
-  def min(first: Price, second: Price) = {
-    if(first.price < second.price) {
+  def min(first: Price, second: Price): Price = {
+    if (first.price < second.price) {
       first
     } else {
       second
@@ -45,13 +47,15 @@ object Price {
 
 case class StockPriceState(stockID: String, max: Price, min: Price, current: 
Price) {
 
-  def drawDownPeriod = min.timestamp - max.timestamp
+  def drawDownPeriod: Long = min.timestamp - max.timestamp
 
-  def recoveryPeriod = current.timestamp - min.timestamp
+  def recoveryPeriod: Long = current.timestamp - min.timestamp
 
-  def drawDown = max.price - min.price
+  def drawDown: Float = max.price - min.price
 }
 
 case class GetReport(stockId: String, date: String)
 
-case class Report(stockId: String, name: String, date: String, historyMax: 
Option[StockPriceState], currentMax: Option[StockPriceState])
+case class Report(
+    stockId: String, name: String, date: String, historyMax: 
Option[StockPriceState],
+    currentMax: Option[StockPriceState])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
index aa8541a..33db128 100644
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
+++ 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,37 +16,36 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.examples.stock
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
 
 import akka.actor.Actor._
-import akka.actor.{Actor, Props}
+import akka.actor.{Actor, ActorRefFactory, Props}
 import akka.io.IO
 import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
 import io.gearpump.Message
 import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.appmaster.{AppMaster, ProcessorSummary, 
StreamAppMasterSummary}
+import io.gearpump.streaming.appmaster.{ProcessorSummary, 
StreamAppMasterSummary}
 import io.gearpump.streaming.examples.stock.QueryServer.WebServer
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.HttpService
-import upickle.default.write
 
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import scala.concurrent.ExecutionContext.Implicits.global
 
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf){
   import taskContext.{appId, appMaster}
 
-  import ExecutionContext.Implicits.global
-
   var analyzer: (ProcessorId, ProcessorSummary) = null
   implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
 
@@ -56,7 +55,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) 
extends Task(taskC
   }
 
   override def onNext(msg: Message): Unit = {
-   //Skip
+    // Skip
   }
 
   override def receiveUnManagedMessage: Receive = messageHandler
@@ -67,14 +66,14 @@ class QueryServer(taskContext: TaskContext, conf: 
UserConfig) extends Task(taskC
         val (processorId, processor) = kv
         processor.taskClass == classOf[Analyzer].getName
       }.get
-    case getReport @ GetReport(stockId, date) =>
+    case getReport@GetReport(stockId, date) =>
       val parallism = analyzer._2.parallelism
       val processorId = analyzer._1
       val analyzerTaskId = TaskId(processorId, (stockId.hashCode & 
Integer.MAX_VALUE) % parallism)
       val requester = sender
       import scala.concurrent.Future
       (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap {task =>
+        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
 
         (task.task ? getReport).asInstanceOf[Future[Report]]
       }.map { report =>
@@ -82,7 +81,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) 
extends Task(taskC
         requester ! report
       }
     case _ =>
-    //ignore
+    // Ignore
   }
 }
 
@@ -91,21 +90,22 @@ object QueryServer {
 
     import context.dispatcher
     implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory = context
+    def actorRefFactory: ActorRefFactory = context
     implicit val system = context.system
 
     IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
 
     override def receive: Receive = runRoute(webServer ~ staticRoute)
 
-    def webServer = {
-      path("report" / PathElement) { stockId =>
+    def webServer: Route = {
+      path("report" / Segment) { stockId =>
         get {
           onComplete((context.parent ? GetReport(stockId, 
null)).asInstanceOf[Future[Report]]) {
             case Success(report: Report) =>
               val json = write(report)
               complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError, s"An 
error occurred: ${ex.getMessage}")
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
index 5686ec7..508c282 100644
--- 
a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
+++ 
b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,20 @@
 package io.gearpump.streaming.examples.stock
 
 import java.nio.charset.Charset
+import scala.io.Codec
+
 import org.apache.commons.httpclient.methods.GetMethod
 import org.apache.commons.httpclient.{HttpClient, 
MultiThreadedHttpConnectionManager}
-import StockMarket.ServiceHour
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
 import org.htmlcleaner.{HtmlCleaner, TagNode}
 import org.joda.time.{DateTime, DateTimeZone}
 
-import scala.io.Codec
+import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour
+import io.gearpump.transport.HostPort
+import io.gearpump.util.LogUtil
 
 class StockMarket(service: ServiceHour, proxy: HostPort = null) extends 
Serializable {
 
-  def LOG = LogUtil.getLogger(getClass)
+  private def LOG = LogUtil.getLogger(getClass)
 
   @transient
   private var connectionManager: MultiThreadedHttpConnectionManager = null
@@ -41,7 +42,7 @@ class StockMarket(service: ServiceHour, proxy: HostPort = 
null) extends Serializ
   private val stockPriceParser =
     
"""^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
 
-  def shutdown: Unit = {
+  def shutdown(): Unit = {
     Option(connectionManager).map(_.shutdown())
   }
 
@@ -58,8 +59,7 @@ class StockMarket(service: ServiceHour, proxy: HostPort = 
null) extends Serializ
     _client
   }
 
-  def getPrice(stocks: Array[String]) : Array[StockPrice] = {
-
+  def getPrice(stocks: Array[String]): Array[StockPrice] = {
 
     LOG.info(s"getPrice 1")
 
@@ -72,7 +72,8 @@ class StockMarket(service: ServiceHour, proxy: HostPort = 
null) extends Serializ
       client.executeMethod(get)
       val current = System.currentTimeMillis()
 
-      val output = 
scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(new Codec(Charset 
forName "GBK")).getLines().flatMap { line =>
+      val output = 
scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
+        new Codec(Charset forName "GBK")).getLines().flatMap { line =>
         line match {
           case stockPriceParser(stockId, name, price, delta, pecent, volume, 
money) =>
             Some(StockPrice(stockId, name, price, delta, pecent, volume, 
money, current))
@@ -100,18 +101,18 @@ class StockMarket(service: ServiceHour, proxy: HostPort = 
null) extends Serializ
 
     val root = cleaner.clean(get.getResponseBodyAsStream)
 
-    val stockUrls = 
root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]");
+    val stockUrls = 
root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
 
     val elements = root.getElementsByName("a", true)
 
     val hrefs = (0 until stockUrls.length)
       .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
-      .map {url =>
-      url match {
-        case urlPattern(code) => code
-        case _ => null
-      }
-    }.toArray
+      .map { url =>
+        url match {
+          case urlPattern(code) => code
+          case _ => null
+        }
+      }.toArray
     hrefs
   }
 }
@@ -123,14 +124,14 @@ object StockMarket {
     /**
      * Morning openning: 9:30 am - 11:30 am
      */
-    val morningStart = GMT8(new DateTime(0,1,1,9,30)).getMillis
-    val morningEnd = GMT8(new DateTime(0,1,1,11,30)).getMillis
+    val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
+    val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
 
     /**
      * After noon openning: 13:00 pm - 15:00 pm
      */
-    val afternoonStart = GMT8(new DateTime(0,1,1,13,0)).getMillis
-    val afternoonEnd = GMT8(new DateTime(0,1,1,15,0)).getMillis
+    val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
+    val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
 
     def inService: Boolean = {
 

Reply via email to