http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
 
b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
index 7046220..1a1d019 100644
--- 
a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ 
b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,19 @@
 
 package io.gearpump.streaming.examples.wordcount
 
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
 import io.gearpump.cluster.ClientToMaster.SubmitApplication
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.{MasterHarness, TestUtil}
 import io.gearpump.streaming.examples.wordcountjava.WordCount
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import scala.concurrent.Future
-import scala.util.Success
 
-class WordCountSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfter with MasterHarness {
+class WordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
 
   before {
     startActorSystem()
@@ -38,7 +40,7 @@ class WordCountSpec extends PropSpec with PropertyChecks with 
Matchers with Befo
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   property("WordCount should succeed to submit application with required 
arguments") {
     val requiredArgs = Array.empty[String]
@@ -47,7 +49,9 @@ class WordCountSpec extends PropSpec with PropertyChecks with 
Matchers with Befo
 
     val args = requiredArgs
 
-    Future {WordCount.main(masterConfig, args)}
+    Future {
+      WordCount.main(masterConfig, args)
+    }
 
     masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
     masterReceiver.reply(SubmitApplicationResult(Success(0)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
index e8a3136..387bc75 100644
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,18 +20,18 @@ package io.gearpump.streaming.examples.wordcount
 
 import java.util.concurrent.TimeUnit
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class Split(taskContext : TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import taskContext.{output, self}
+class Split(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import taskContext.output
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
     self ! Message("start")
   }
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     Split.TEXT_TO_SPLIT.lines.foreach { line =>
       line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
         output(new Message(msg, System.currentTimeMillis()))
@@ -39,7 +39,8 @@ class Split(taskContext : TaskContext, conf: UserConfig) 
extends Task(taskContex
     }
 
     import scala.concurrent.duration._
-    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! 
Message("continue", System.currentTimeMillis()))
+    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
+      Message("continue", System.currentTimeMillis()))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
index 10cc172..6560066 100644
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,47 +19,47 @@
 package io.gearpump.streaming.examples.wordcount
 
 import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.Cancellable
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-import scala.collection.mutable
-import scala.concurrent.duration.FiniteDuration
-
-class Sum (taskContext : TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  private[wordcount] val map : mutable.HashMap[String, Long] = new 
mutable.HashMap[String, Long]()
+class Sum(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  private[wordcount] val map: mutable.HashMap[String, Long] = new 
mutable.HashMap[String, Long]()
 
-  private[wordcount] var wordCount : Long = 0
-  private var snapShotTime : Long = System.currentTimeMillis()
-  private var snapShotWordCount : Long = 0
+  private[wordcount] var wordCount: Long = 0
+  private var snapShotTime: Long = System.currentTimeMillis()
+  private var snapShotWordCount: Long = 0
 
-  private var scheduler : Cancellable = null
+  private var scheduler: Cancellable = null
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
   }
 
-  override def onNext(msg : Message) : Unit = {
-    if (null == msg) {
-      return
+  override def onNext(msg: Message): Unit = {
+    if (null != msg) {
+      val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
+      wordCount += 1
+      map.put(msg.msg.asInstanceOf[String], current + 1)
     }
-    val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
-    wordCount += 1
-    map.put(msg.msg.asInstanceOf[String], current + 1)
   }
 
-  override def onStop() : Unit = {
+  override def onStop(): Unit = {
     if (scheduler != null) {
       scheduler.cancel()
     }
   }
 
-  def reportWordCount() : Unit = {
-    val current : Long = System.currentTimeMillis()
-    LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - 
snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
+  def reportWordCount(): Unit = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task ${taskContext.taskId} Throughput:" +
+      s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} 
(words, second)")
     snapShotWordCount = wordCount
     snapShotTime = current
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
index 22f8ac6..f23604a 100644
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,18 @@
 
 package io.gearpump.streaming.examples.wordcount
 
-import io.gearpump.cluster.embedded.{EmbeddedCluster}
-import io.gearpump.streaming.{StreamApplication, Processor}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.embedded.EmbeddedCluster
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph.Node
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
 
+/** Same WordCount with low level Processor Graph syntax */
 object WordCount extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
   val RUN_FOR_EVER = -1
@@ -36,10 +38,11 @@ object WordCount extends AkkaApp with ArgumentsParser {
     "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
     "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
-    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false, defaultValue = Some(30))
-   )
+    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false,
+      defaultValue = Some(30))
+  )
 
-  def application(config: ParseResult) : StreamApplication = {
+  def application(config: ParseResult): StreamApplication = {
     val splitNum = config.getInt("split")
     val sumNum = config.getInt("sum")
     val split = Processor[Split](splitNum)
@@ -58,7 +61,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
     val localCluster = if (debugMode) {
       val cluster = new EmbeddedCluster(akkaConf: Config)
-      cluster.start
+      cluster.start()
       Some(cluster)
     } else {
       None
@@ -73,11 +76,11 @@ object WordCount extends AkkaApp with ArgumentsParser {
     context.submit(app)
 
     if (debugMode) {
-      Thread.sleep(sleepSeconds * 1000) // sleep for 30 seconds for debugging.
+      Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
     }
 
     context.close()
-    localCluster.map(_.stop)
+    localCluster.map(_.stop())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index cc516db..ab8e8d0 100644
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,8 @@ import io.gearpump.streaming.dsl.StreamApp
 import io.gearpump.streaming.dsl.StreamApp._
 import io.gearpump.util.AkkaApp
 
-object WordCount extends AkkaApp with ArgumentsParser{
+/** Same WordCount with High level DSL syntax */
+object WordCount extends AkkaApp with ArgumentsParser {
 
   override val options: Array[(String, CLIOption[Any])] = Array.empty
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
index c44836c..21e498e 100644
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,17 +17,19 @@
  */
 package io.gearpump.streaming.examples.wordcount
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 
-import scala.language.postfixOps
+import io.gearpump.Message
+import io.gearpump.cluster.{TestUtil, UserConfig}
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
 
 class SplitSpec extends WordSpec with Matchers {
 
@@ -36,11 +38,11 @@ class SplitSpec extends WordSpec with Matchers {
 
       val taskContext = MockUtil.mockTaskContext
 
-      implicit val system = ActorSystem("test",  TestUtil.DEFAULT_CONFIG)
+      implicit val system: ActorSystem = ActorSystem("test", 
TestUtil.DEFAULT_CONFIG)
 
       val mockTaskActor = TestProbe()
 
-      //mock self ActorRef
+      // Mock self ActorRef
       when(taskContext.self).thenReturn(mockTaskActor.ref)
 
       val conf = UserConfig.empty
@@ -48,13 +50,13 @@ class SplitSpec extends WordSpec with Matchers {
       split.onStart(StartTime(0))
       mockTaskActor.expectMsgType[Message]
 
-      val expectedWordCount = 
Split.TEXT_TO_SPLIT.split("""[\s\n]+""").filter(_.nonEmpty).length
+      val expectedWordCount = Split.TEXT_TO_SPLIT.split( 
"""[\s\n]+""").filter(_.nonEmpty).length
 
       split.onNext(Message("next"))
       verify(taskContext, times(expectedWordCount)).output(anyObject())
 
-      system.shutdown()
-      system.awaitTermination()
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
index 2303ce9..48a3fa9 100644
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,20 +17,21 @@
  */
 package io.gearpump.streaming.examples.wordcount
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
+
 class SumSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfter {
   val stringGenerator = Gen.alphaStr
 
   var wordcount = 0
 
-  property("Sum should calculate the frequency of the word correctly"){
+  property("Sum should calculate the frequency of the word correctly") {
 
     val taskContext = MockUtil.mockTaskContext
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
index 7fd8c84..63b2312 100644
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
+++ 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,18 @@
 
 package io.gearpump.streaming.examples.wordcount
 
-import io.gearpump.cluster.ClientToMaster.{ShutdownApplication, 
SubmitApplication}
-import io.gearpump.cluster.MasterToClient.{ShutdownApplicationResult, 
SubmitApplicationResult}
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Util
+import scala.concurrent.Future
+import scala.util.Success
+
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.util.{Success, Try}
-import scala.concurrent.Future
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 
-class WordCountSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfter with MasterHarness {
+class WordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
 
   before {
     startActorSystem()
@@ -38,7 +39,7 @@ class WordCountSpec extends PropSpec with PropertyChecks with 
Matchers with Befo
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   property("WordCount should succeed to submit application with required 
arguments") {
     val requiredArgs = Array.empty[String]
@@ -57,7 +58,9 @@ class WordCountSpec extends PropSpec with PropertyChecks with 
Matchers with Befo
 
       val args = requiredArgs ++ optionalArgs
 
-      Future {WordCount.main(masterConfig, args)}
+      Future {
+        WordCount.main(masterConfig, args)
+      }
 
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/README.md
----------------------------------------------------------------------
diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md
index fe04554..7c9a316 100644
--- a/experiments/akkastream/README.md
+++ b/experiments/akkastream/README.md
@@ -1,2 +1,4 @@
 Akka Stream 
-=========
\ No newline at end of file
+=========
+
+TODO: This directory is obsolte. Working on updating it to Akka 2.4.3.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala 
b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
index 42023c7..d2b328d 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
+++ b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,7 @@ import scala.concurrent.ExecutionContextExecutor
  * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph.
  *
  * @see [[ModuleGraph]] for the difference between RunnableGraph and
- * [[ModuleGraph]]
+ *      [[ModuleGraph]]
  *
  */
 abstract class BaseMaterializer extends akka.stream.Materializer {
@@ -36,7 +36,6 @@ abstract class BaseMaterializer extends 
akka.stream.Materializer {
 
   override implicit def executionContext: ExecutionContextExecutor = throw new 
UnsupportedOperationException()
 
-
   def materialize[Mat](graph: ModuleGraph[Mat]): Mat
 
   override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = 
{
@@ -44,5 +43,5 @@ abstract class BaseMaterializer extends 
akka.stream.Materializer {
     materialize(graph)
   }
 
-  def shutdown: Unit
+  def shutdown(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala 
b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
index bb23586..8851083 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
+++ b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,8 +24,8 @@
 
 package akka.stream
 
-import _root_.io.gearpump.util
-import _root_.io.gearpump.util.Graph
+import scala.collection.mutable
+
 import akka.stream.Attributes.Attribute
 import akka.stream.ModuleGraph.Edge
 import akka.stream.gearpump.util.MaterializedValueOps
@@ -33,7 +33,8 @@ import akka.stream.impl.StreamLayout._
 import akka.stream.impl._
 import akka.stream.{Graph => AkkaGraph}
 
-import scala.collection.mutable
+import _root_.io.gearpump.util
+import _root_.io.gearpump.util.Graph
 
 /**
  *
@@ -114,7 +115,7 @@ object ModuleGraph {
   def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] 
= {
     val topLevel = runnableGraph.module
     val factory = new ModuleGraphFactory(topLevel)
-    val (graph, mat) =  factory.create()
+    val (graph, mat) = factory.create()
     new ModuleGraph(graph, mat)
   }
 
@@ -208,7 +209,7 @@ object ModuleGraph {
 
       val mat = resolveMaterialized(module.materializedValueComputation, 
materializedValues)
 
-      materializedValueSources.foreach{module =>
+      materializedValueSources.foreach { module =>
         val matAttribute = new MaterializedValueSourceAttribute(mat)
         val copied = copyAtomicModule(module, parentAttributes and 
Attributes(matAttribute))
         assignPort(module.shape.outlet, (copied.shape.outlet, copied))
@@ -227,10 +228,10 @@ object ModuleGraph {
     }
 
     private def resolveMaterialized(matNode: MaterializedValueNode, 
materializedValues: collection.Map[Module, MaterializedValueNode]): 
MaterializedValueNode = matNode match {
-      case Atomic(m)          => materializedValues(m)
+      case Atomic(m) => materializedValues(m)
       case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, 
materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d)    => Transform(f, resolveMaterialized(d, 
materializedValues))
-      case Ignore             => Ignore
+      case Transform(f, d) => Transform(f, resolveMaterialized(d, 
materializedValues))
+      case Ignore => Ignore
     }
 
     final protected def assignPort(in: InPort, subscriber: (InPort, Module)): 
Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
index 8f36bea..50c4450 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,7 +28,7 @@ object GearAttributes {
    * @param count
    * @return
    */
-  def count(count: Int): Attributes =  Attributes(ParallismAttribute(count))
+  def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
 
   /**
    * Define we want to render this module locally.
@@ -52,7 +52,7 @@ object GearAttributes {
   def location(attrs: Attributes): Location = {
     attrs.attributeList.foldLeft(Local: Location) { (s, attr) =>
       attr match {
-        case LocationAttribute(location)    => location
+        case LocationAttribute(location) => location
         case other => s
       }
     }
@@ -66,7 +66,7 @@ object GearAttributes {
   def count(attrs: Attributes): Int = {
     attrs.attributeList.foldLeft(1) { (s, attr) =>
       attr match {
-        case ParallismAttribute(count)    => count
+        case ParallismAttribute(count) => count
         case other => s
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
index 2bdeb0f..a11d7cb 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,31 +20,32 @@ package akka.stream.gearpump
 
 import akka.actor.ActorSystem
 import akka.stream._
+import akka.stream.gearpump.graph.GraphCutter.Strategy
 import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer
 import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer
-import akka.stream.gearpump.graph.{RemoteGraph, SubGraphMaterializer, 
LocalGraph, GraphCutter}
-import akka.stream.gearpump.graph.GraphCutter.Strategy
+import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, 
SubGraphMaterializer}
 import akka.stream.impl.StreamLayout.Module
 
 /**
  *
  * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
  * streaming application. If some module cannot be rendered remotely in 
Gearpump
- * Cluster, then it will use local Actor materializer as fallback to 
materialize
+ * Cluster, then it uses local Actor materializer as fallback to materialize
  * the module locally.
  *
  * User can custom a [[Strategy]] to determinie which module should be rendered
  * remotely, and which module should be rendered locally.
  *
  * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two 
parts,
- *   and materialize them seperately.
+ *      and materialize them seperately.
  *
  * @param system
  * @param strategy
  * @param useLocalCluster whether to use built-in in-process local cluster
  */
-class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = 
GraphCutter.AllRemoteStrategy, useLocalCluster: Boolean = true)
-    extends BaseMaterializer {
+class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = 
GraphCutter.AllRemoteStrategy,
+    useLocalCluster: Boolean = true)
+  extends BaseMaterializer {
 
   private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
     classOf[LocalGraph] -> new LocalGraphMaterializer(system),
@@ -53,18 +54,18 @@ class GearpumpMaterializer(system: ActorSystem, strategy: 
Strategy = GraphCutter
 
   override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = {
     val subGraphs = new GraphCutter(strategy).cut(graph)
-    val matValues = subGraphs.foldLeft(Map.empty[Module, Any]){(map, subGraph) 
=>
+    val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, 
subGraph) =>
       val materializer = subMaterializers(subGraph.getClass)
       map ++ materializer.materialize(subGraph, map)
     }
     graph.resolve(matValues)
   }
 
-  override def shutdown: Unit = {
-    subMaterializers.values.foreach(_.shutdown)
+  override def shutdown(): Unit = {
+    subMaterializers.values.foreach(_.shutdown())
   }
 }
 
-object GearpumpMaterializer{
-  def apply(system: ActorSystem) = new GearpumpMaterializer(system)
+object GearpumpMaterializer {
+  def apply(system: ActorSystem): GearpumpMaterializer = new 
GearpumpMaterializer(system)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
index d466595..7808b52 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,26 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.graph.GraphCutter
 import akka.stream.scaladsl.{Sink, Source}
-import io.gearpump.cluster.ClusterConfig
 
 /**
-  * This tests how the [[GearpumpMaterializer]] materializes different 
partials of Graph
-  * to different runtime.
-  *
-  * In this test, source module and sink module will be materialized locally,
-  * Other transformation module will be materialized remotely in Gearpump
-  * streaming Application.
-  *
-  * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
-  *
-  *
-  */
+ * This tests how the [[GearpumpMaterializer]] materializes different partials 
of Graph
+ * to different runtime.
+ *
+ * In this test, source module and sink module are materialized locally,
+ * Other transformation module are materialized remotely in Gearpump
+ * streaming Application.
+ *
+ * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
+ *
+ *
+ */
 object Test {
 
   def main(args: Array[String]): Unit = {
@@ -47,18 +49,21 @@ object Test {
 
     val echo = system.actorOf(Props(new Echo()))
     val sink = Sink.actorRef(echo, "COMPLETE")
-    val source = Source(List("red hat", "yellow sweater", "blue jack", "red 
apple", "green plant", "blue sky"))
-    source.filter(_.startsWith("red")).fold("Items:"){(a, b) =>
+    val source = Source(List("red hat", "yellow sweater", "blue jack", "red 
apple", "green plant",
+      "blue sky"))
+    source.filter(_.startsWith("red")).fold("Items:") { (a, b) =>
       a + "|" + b
     }.map("I want to order item: " + _).runWith(sink)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   class Echo extends Actor {
     def receive: Receive = {
       case any: AnyRef =>
+        // scalastyle:off println
         println("Confirm received: " + any)
+      // scalastyle:on println
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
index 18d51a9..2426f5f 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,21 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.ActorMaterializer
 import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.{GearSink, GearSource}
-import akka.stream.scaladsl.{Flow, FlowGraph, Sink, Source}
-import io.gearpump.cluster.ClusterConfig
+import akka.stream.scaladsl.{Flow, Sink, Source}
 
 /**
-  *
-  * This tests how different Materializers can be used together in an explicit 
way.
-  * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
-  *
-  */
+ *
+ * This tests how different Materializers can be used together in an explicit 
way.
+ * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
+ *
+ */
 object Test2 {
 
   def main(args: Array[String]): Unit = {
@@ -52,13 +54,12 @@ object Test2 {
     val externalSink = Sink.actorRef(echo, "COMPLETE")
 
     val graph = FlowGraph.closed() { implicit b =>
-      import FlowGraph.Implicits._
       externalSource ~> Sink(entry)
       Source(exit) ~> externalSink
     }
     graph.run()(actorMaterializer)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   class Echo extends Actor {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
index 1d4afed..b1bf01c 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.GearSource
 import akka.stream.scaladsl.Sink
-import io.gearpump.cluster.ClusterConfig
+
 import io.gearpump.streaming.dsl.CollectionDataSource
 
 /**
-  * read from remote and write to local
-  * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
-  */
+ * read from remote and write to local
+ * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
+ */
 object Test3 {
 
   def main(args: Array[String]): Unit = {
@@ -44,7 +47,7 @@ object Test3 {
     val source = GearSource.from[String](sourceData)
     source.filter(_.startsWith("red")).map("I want to order item: " + 
_).runWith(sink)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   class Echo extends Actor {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
index b97d329..b1e2fcb 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
 import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.GearSink
 import akka.stream.scaladsl.Source
-import io.gearpump.cluster.ClusterConfig
+
 import io.gearpump.streaming.dsl.LoggerSink
 
 /**
-  * read from local and write to remote
-  * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
-  */
+ * read from local and write to remote
+ * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
+ */
 object Test4 {
 
   def main(args: Array[String]): Unit = {
@@ -42,6 +45,6 @@ object Test4 {
     val source = Source(List("red hat", "yellow sweater", "blue jack", "red 
apple", "green plant", "blue sky"))
     source.filter(_.startsWith("red")).map("I want to order item: " + 
_).runWith(sink)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
index a9da578..052c018 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,17 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Unzip, Source, FlowGraph, Sink}
-import io.gearpump.cluster.ClusterConfig
+import akka.stream.scaladsl.{Sink, Source, Unzip}
 
 /**
- test fanout
- */
+test fanout
+  */
 object Test5 {
 
   def main(args: Array[String]): Unit = {
@@ -42,7 +44,6 @@ object Test5 {
     val source = Source(List(("male", "24"), ("female", "23")))
 
     val graph = FlowGraph.closed() { implicit b =>
-      import FlowGraph.Implicits._
       val unzip = b.add(Unzip[String, String]())
 
       val sink1 = Sink.actorRef(echo, "COMPLETE")
@@ -55,7 +56,7 @@ object Test5 {
 
     graph.run()
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   class Echo extends Actor {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
index 8e9ccf6..38b40b7 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package akka.stream.gearpump.example
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.{Reduce, GroupBy, GearSource}
+import akka.stream.gearpump.scaladsl.GearSource
 import akka.stream.scaladsl.Sink
-import io.gearpump.streaming.dsl.CollectionDataSource
 
+import io.gearpump.streaming.dsl.CollectionDataSource
 
 /**
- *  WordCount example
-  * Test GroupBy
-  */
+ * WordCount example
+ * Test GroupBy
+ */
 
 import akka.stream.gearpump.scaladsl.Implicits._
 
@@ -45,14 +48,14 @@ object Test6 {
     val sink = Sink.actorRef(echo, "COMPLETE")
     val sourceData = new CollectionDataSource(List("this is a good start", 
"this is a good time", "time to start", "congratulations", "green plant", "blue 
sky"))
     val source = GearSource.from[String](sourceData)
-    source.mapConcat{line =>
+    source.mapConcat { line =>
       line.split(" ").toList
-    }.groupBy2(x=>x).map(word => (word, 1))
-      .reduce({(a, b) =>
+    }.groupBy2(x => x).map(word => (word, 1))
+      .reduce({ (a, b) =>
         (a._1, a._2 + b._2)
       }).log("word-count").runWith(sink)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   class Echo extends Actor {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
index 915624f..c2d0417 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,26 +20,26 @@ package akka.stream.gearpump.example
 
 import java.io.{File, FileInputStream}
 import java.util.zip.GZIPInputStream
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
 
 import akka.actor.ActorSystem
-import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
 import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.io.{Framing, InputStreamSource}
+import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
 import akka.stream.scaladsl._
 import akka.util.ByteString
-import io.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import io.gearpump.util.AkkaApp
 import org.json4s.JsonAST.JString
 
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.util.AkkaApp
 
 /**
  * this example is ported from 
http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
  * which showcases running Akka Streams DSL across JVMs on Gearpump
  *
  * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
- *            -input wikidata-${DATE}-all.json.gz -languages en,de
+ * -input wikidata-${DATE}-all.json.gz -languages en,de
  *
  * (Note: Wikipedia data can be downloaded from 
https://dumps.wikimedia.org/wikidatawiki/entities/)
  *
@@ -66,11 +66,10 @@ object WikipediaApp extends ArgumentsParser with AkkaApp {
 
     val g = FlowGraph.closed(count) { implicit b =>
       sinkCount => {
-        import FlowGraph.Implicits._
 
         val broadcast = b.add(Broadcast[WikidataElement](2))
         elements ~> broadcast ~> logEveryNSink(1000)
-                    broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+        broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
       }
     }
 
@@ -79,9 +78,9 @@ object WikipediaApp extends ArgumentsParser with AkkaApp {
         case Success((t, f)) => printResults(t, f)
         case Failure(tr) => println("Something went wrong")
       }
-      system.shutdown()
+      system.terminate()
     }
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   def source(file: File): Source[String, Future[Long]] = {
@@ -124,21 +123,21 @@ object WikipediaApp extends ArgumentsParser with AkkaApp {
     .filter(_.sites.keySet == langs)
     .map { x =>
       val titles = x.sites.values
-      titles.forall( _ == titles.head)
+      titles.forall(_ == titles.head)
     }.withAttributes(GearAttributes.remote)
 
-  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0,0)) {
-    case ((t, f), true) => (t+1, f)
-    case ((t, f), false) => (t, f+1)
+  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
+    case ((t, f), true) => (t + 1, f)
+    case ((t, f), false) => (t, f + 1)
   }
 
   def printResults(t: Int, f: Int) = {
-    val message = s"""
-                     | Number of items with the same title: $t
-        | Number of items with the different title: $f
-        | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+    val message =
+      s"""
+         | Number of items with the same title: $t
+         | Number of items with the different title: $f
+         | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
                   """.stripMargin
     println(message)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
index 8d835a2..ef8c6fa 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,15 +23,16 @@ import akka.stream.ModuleGraph.Edge
 import akka.stream.gearpump.GearAttributes
 import akka.stream.gearpump.GearAttributes.{Local, Location, Remote}
 import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.module.{GroupByModule, BridgeModule, DummyModule, 
GearpumpTaskModule, SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.{TimerTransform, DirectProcessor}
+import akka.stream.gearpump.module.{BridgeModule, DummyModule, 
GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule}
+import akka.stream.impl.Stages.DirectProcessor
 import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module}
-import akka.stream.impl.{MaterializedValueSource, SinkModule, SourceModule}
+import akka.stream.impl.{SinkModule, SourceModule}
+
 import io.gearpump.util.Graph
 
 /**
  *
- * GraphCutter is used to decide which part will be rendered locally
+ * GraphCutter is used to decide which part is rendered locally
  * and which part should be rendered remotely.
  *
  * We will cut the graph based on the [[Strategy]] provided.
@@ -64,11 +65,12 @@ class GraphCutter(strategy: Strategy) {
     doCut(graph, tags, moduleGraph.mat)
   }
 
-  private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location], 
mat: MaterializedValueNode): List[SubGraph] = {
+  private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location],
+      mat: MaterializedValueNode): List[SubGraph] = {
     val local = Graph.empty[Module, Edge]
     val remote = Graph.empty[Module, Edge]
 
-    graph.vertices.foreach{ module =>
+    graph.vertices.foreach { module =>
       if (tags(module) == Local) {
         local.addVertex(module)
       } else {
@@ -76,7 +78,7 @@ class GraphCutter(strategy: Strategy) {
       }
     }
 
-    graph.edges.foreach{ nodeEdgeNode =>
+    graph.edges.foreach { nodeEdgeNode =>
       val (node1, edge, node2) = nodeEdgeNode
       (tags(node1), tags(node2)) match {
         case (Local, Local) =>
@@ -88,7 +90,7 @@ class GraphCutter(strategy: Strategy) {
             case bridge: BridgeModule[_, _, _] =>
               local.addEdge(node1, edge, node2)
             case _ =>
-              // create a bridge module in between
+              // Creates a bridge module in between
               val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
               val remoteEdge = Edge(bridge.outPort, edge.to)
               remote.addEdge(bridge, remoteEdge, node2)
@@ -100,7 +102,7 @@ class GraphCutter(strategy: Strategy) {
             case bridge: BridgeModule[_, _, _] =>
               local.addEdge(node1, edge, node2)
             case _ =>
-              // create a bridge module in between
+              // Creates a bridge module in between
               val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
               val remoteEdge = Edge(edge.from, bridge.inPort)
               remote.addEdge(node1, remoteEdge, bridge)
@@ -114,14 +116,14 @@ class GraphCutter(strategy: Strategy) {
   }
 
   private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, 
Location] = {
-    graph.vertices.map{vertex =>
+    graph.vertices.map { vertex =>
       vertex -> strategy.apply(vertex)
     }.toMap
   }
 
   private def removeDummyModule(inputGraph: Graph[Module, Edge]): 
Graph[Module, Edge] = {
     val graph = inputGraph.copy
-    val dummies = graph.vertices.filter {module =>
+    val dummies = graph.vertices.filter { module =>
       module match {
         case dummy: DummyModule =>
           true
@@ -136,7 +138,6 @@ class GraphCutter(strategy: Strategy) {
 
 object GraphCutter {
 
-
   type Strategy = PartialFunction[Module, Location]
 
   val BaseStrategy: Strategy = {
@@ -145,8 +146,7 @@ object GraphCutter {
     case task: GearpumpTaskModule =>
       Remote
     case groupBy: GroupByModule[_, _] =>
-      //TODO: groupBy is not supported by local materializer
-      // yet
+      // TODO: groupBy is not supported by local materializer yet
       Remote
     case source: SourceModule[_, _] =>
       Local
@@ -157,7 +157,7 @@ object GraphCutter {
     case direct: DirectProcessor =>
       Local
     case time: TimerTransform =>
-      // render to local as it requires a timer.
+      // Renders to local as it requires a timer.
       Local
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
index b9533c4..473f32a 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,13 +26,14 @@ import akka.stream.impl.Stages.DefaultAttributes
 import akka.stream.impl.StreamLayout.Module
 import akka.stream.impl.{PublisherSource, SubscriberSink}
 import akka.stream.{Outlet, SinkShape, SourceShape}
-import io.gearpump.util.Graph
 import org.reactivestreams.{Publisher, Subscriber}
 
+import io.gearpump.util.Graph
+
 /**
  *
  * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- *  contain module that can be materialized in local JVM.
+ * contain module that can be materialized in local JVM.
  *
  * @param graph
  */
@@ -46,7 +47,7 @@ object LocalGraph {
    */
   class LocalGraphMaterializer(system: ActorSystem) extends 
SubGraphMaterializer {
 
-    // create a local materializer
+    // Creates a local materializer
     val materializer = LocalMaterializer()(system)
 
     /**
@@ -55,7 +56,7 @@ object LocalGraph {
      * @return Materialized Values for each Module after the materialization.
      */
     override def materialize(graph: SubGraph, matValues: Map[Module, Any]): 
Map[Module, Any] = {
-      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex{ module =>
+      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module =>
         module match {
           case source: SourceBridgeModule[AnyRef, AnyRef] =>
             val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]]
@@ -72,7 +73,7 @@ object LocalGraph {
       materializer.materialize(newGraph, matValues)
     }
 
-    override def shutdown: Unit = {
+    override def shutdown(): Unit = {
       materializer.shutdown()
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
index 250c354..5bb9a68 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,16 +25,16 @@ import akka.stream.gearpump.module.{SinkBridgeModule, 
SourceBridgeModule}
 import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient
 import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
-import io.gearpump.cluster.ClusterConfig
+
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.embedded.{EmbeddedCluster}
-import io.gearpump.streaming.{StreamApplication, ProcessorId}
+import io.gearpump.cluster.embedded.EmbeddedCluster
+import io.gearpump.streaming.ProcessorId
 import io.gearpump.util.Graph
 
 /**
  *
  * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- *  contain modules that can be materialized in remote Gearpump cluster.
+ * contain modules that can be materialized in remote Gearpump cluster.
  *
  * @param graph
  */
@@ -50,7 +50,7 @@ object RemoteGraph {
   class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: 
ActorSystem) extends SubGraphMaterializer {
     private val local = if (useInProcessCluster) {
       val cluster = EmbeddedCluster()
-      cluster.start
+      cluster.start()
       Some(cluster)
     } else {
       None
@@ -63,7 +63,7 @@ object RemoteGraph {
 
     override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, 
Any]): Map[Module, Any] = {
       val graph = subGraph.graph
-      
+
       if (graph.isEmpty) {
         inputMatValues
       } else {
@@ -98,9 +98,9 @@ object RemoteGraph {
       inputMatValues ++ resolve(matValues)
     }
 
-    override def shutdown: Unit = {
+    override def shutdown(): Unit = {
       context.close()
-      local.map(_.stop)
+      local.map(_.stop())
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
index 471370d..4364645 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,9 @@
 
 package akka.stream.gearpump.graph
 
-import akka.actor.ActorSystem
 import akka.stream.ModuleGraph.Edge
 import akka.stream.impl.StreamLayout.Module
+
 import io.gearpump.util.Graph
 
 /**
@@ -40,11 +40,10 @@ trait SubGraph {
   def graph: Graph[Module, Edge]
 }
 
-
 /**
  * Materializer for Sub-Graph type
  */
-trait SubGraphMaterializer{
+trait SubGraphMaterializer {
   /**
    *
    * @param matValues Materialized Values for each module before 
materialization
@@ -53,5 +52,5 @@ trait SubGraphMaterializer{
 
   def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, 
Any]
 
-  def shutdown: Unit
+  def shutdown(): Unit
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
index 9c98924..78125c8 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,17 +19,17 @@
 package akka.stream.gearpump.materializer
 
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import scala.concurrent.{Await, ExecutionContextExecutor}
 
 import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, 
PoisonPill, Props, RepointableActorRef}
 import akka.dispatch.Dispatchers
 import akka.pattern.ask
 import akka.stream.ModuleGraph.Edge
 import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{FlowNameCounter, StreamSupervisor}
-import akka.stream.{ActorAttributes, ActorMaterializer, 
ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, 
MaterializationContext, ModuleGraph, Optimizations}
-import io.gearpump.util.Graph
+import akka.stream.impl.StreamSupervisor
+import akka.stream.{ActorAttributes, ActorMaterializer, 
ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, 
MaterializationContext, ModuleGraph}
 
-import scala.concurrent.{Await, ExecutionContextExecutor}
+import io.gearpump.util.Graph
 
 /**
  * [[LocalMaterializer]] will use local actor to materialize the graph
@@ -62,11 +62,11 @@ abstract class LocalMaterializer(
     import Attributes._
     opAttr.attributeList.foldLeft(settings) { (s, attr) =>
       attr match {
-        case InputBuffer(initial, max)    => s.withInputBuffer(initial, max)
-        case Dispatcher(dispatcher)       => s.withDispatcher(dispatcher)
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
         case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
-        case l: LogLevels                 => s
-        case Name(_)                      => s
+        case l: LogLevels => s
+        case Name(_) => s
         case other => s
       }
     }
@@ -82,22 +82,26 @@ abstract class LocalMaterializer(
       case ref: LocalActorRef =>
         ref.underlying.attachChild(props.withDispatcher(dispatcher), name, 
systemService = false)
       case ref: RepointableActorRef =>
-        if (ref.isStarted)
-          
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher),
 name, systemService = false)
-        else {
+        if (ref.isStarted) {
+          
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher),
+            name, systemService = false)
+        } else {
           implicit val timeout = ref.system.settings.CreationTimeout
-          val f = (supervisor ? 
StreamSupervisor.Materialize(props.withDispatcher(dispatcher), 
name)).mapTo[ActorRef]
+          val f = (supervisor ? 
StreamSupervisor.Materialize(props.withDispatcher(dispatcher),
+            name)).mapTo[ActorRef]
           Await.result(f, timeout.duration)
         }
       case unknown =>
-        throw new IllegalStateException(s"Stream supervisor must be a local 
actor, was [${unknown.getClass.getName}]")
+        throw new IllegalStateException(
+          s"Stream supervisor must be a local actor, was 
[${unknown.getClass.getName}]")
     }
   }
 
-  override lazy val executionContext: ExecutionContextExecutor = 
dispatchers.lookup(settings.dispatcher match {
-    case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
-    case other                    => other
-  })
+  override lazy val executionContext: ExecutionContextExecutor =
+    dispatchers.lookup(settings.dispatcher match {
+      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+      case other => other
+    })
 
   def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, 
Any]): Map[Module, Any]
 
@@ -109,28 +113,37 @@ abstract class LocalMaterializer(
 
   override def actorOf(context: MaterializationContext, props: Props): 
ActorRef = {
     val dispatcher =
-      if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) 
effectiveSettings(context.effectiveAttributes).dispatcher
-      else props.dispatcher
+      if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) {
+        effectiveSettings(context.effectiveAttributes).dispatcher
+      } else {
+        props.dispatcher
+      }
     actorOf(props, context.stageName, dispatcher)
   }
 }
 
 object LocalMaterializer {
 
-  def apply(materializerSettings: Option[ActorMaterializerSettings] = None, 
namePrefix: Option[String] = None, optimizations: Optimizations = 
Optimizations.none)(implicit system: ActorSystem): LocalMaterializerImpl  = {
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      namePrefix: Option[String] = None,
+      optimizations: Optimizations = Optimizations.none)(implicit system: 
ActorSystem)
+    : LocalMaterializerImpl = {
 
     val settings = materializerSettings getOrElse 
ActorMaterializerSettings(system)
     apply(settings, namePrefix.getOrElse("flow"), optimizations)(system)
   }
 
-  def apply(materializerSettings: ActorMaterializerSettings, namePrefix: 
String, optimizations: Optimizations)(implicit system: ActorSystem): 
LocalMaterializerImpl = {
+  def apply(materializerSettings: ActorMaterializerSettings,
+      namePrefix: String, optimizations: Optimizations)(implicit system: 
ActorSystem)
+    : LocalMaterializerImpl = {
     val haveShutDown = new AtomicBoolean(false)
 
     new LocalMaterializerImpl(
       system,
       materializerSettings,
       system.dispatchers,
-      system.actorOf(StreamSupervisor.props(materializerSettings, 
haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+      system.actorOf(StreamSupervisor.props(materializerSettings,
+        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
       haveShutDown,
       FlowNameCounter(system).counter,
       namePrefix,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
index 2ccc13f..97d8f70 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,17 +27,14 @@ import akka.stream.actor.ActorSubscriber
 import 
akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
 import akka.stream.gearpump.module.ReduceModule
 import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.GenJunctions.{UnzipWithModule, ZipWithModule}
-import akka.stream.impl.Junctions.{BalanceModule, BroadcastModule, 
ConcatModule, FanInModule, FanOutModule, FlexiMergeModule, FlexiRouteModule, 
JunctionModule, MergeModule, MergePreferredModule}
-import akka.stream.impl.Stages.{Fold, DirectProcessor, Identity, StageModule}
+import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
 import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.io.SslTlsCipherActor
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, Balance, 
Broadcast, Concat, ExposedPublisher, FairMerge, FanIn, FanOut, FlexiMerge, 
FlexiRoute, MaterializedValuePublisher, MaterializedValueSource, SinkModule, 
SourceModule, UnfairMerge, VirtualProcessor}
-import akka.stream.io.SslTls.TlsModule
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, 
InPort, MaterializationContext, Materializer, Optimizations, OutPort, Shape}
-import io.gearpump.util.Graph
+import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, 
ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
+import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, 
InPort, MaterializationContext, Materializer, OutPort, Shape}
 import org.reactivestreams.{Processor, Publisher, Subscriber}
 
+import io.gearpump.util.Graph
+
 /**
  * This materializer is functional equivalent to 
[[akka.stream.impl.ActorMaterializerImpl]]
  *
@@ -61,14 +58,14 @@ class LocalMaterializerImpl (
     optimizations: Optimizations)
   extends LocalMaterializer(
     system, settings, dispatchers, supervisor,
-    haveShutDown, flowNameCounter, namePrefix, optimizations){
+    haveShutDown, flowNameCounter, namePrefix, optimizations) {
 
   override def materialize(graph: Graph[Module, Edge], inputMatValues: 
Map[Module, Any]): Map[Module, Any] = {
-    val materializedGraph = graph.mapVertex{module =>
+    val materializedGraph = graph.mapVertex { module =>
       materializeAtomic(module)
     }
 
-    materializedGraph.edges.foreach{nodeEdgeNode =>
+    materializedGraph.edges.foreach { nodeEdgeNode =>
       val (node1, edge, node2) = nodeEdgeNode
       val from = edge.from
       val to = edge.to
@@ -77,7 +74,7 @@ class LocalMaterializerImpl (
       publisher.subscribe(subscriber)
     }
 
-    val matValues = inputMatValues ++ materializedGraph.vertices.map{vertex =>
+    val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex 
=>
       (vertex.module, vertex.matValue)
     }.toMap
 
@@ -139,7 +136,7 @@ class LocalMaterializerImpl (
         MaterializedModule(source, mat, outputs = outputs)
 
       case reduce: ReduceModule[Any] =>
-      //TODO: remove this after the official akka-stream API support the 
Reduce Module
+        //TODO: remove this after the official akka-stream API support the 
Reduce Module
         val stage = LocalMaterializerImpl.toFoldModule(reduce)
         val (processor, mat) = processorFor(stage, effectiveAttributes, 
effectiveSettings(effectiveAttributes))
         val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
@@ -177,88 +174,90 @@ class LocalMaterializerImpl (
   }
 
   private def processorFor(op: StageModule,
-                           effectiveAttributes: Attributes,
-                           effectiveSettings: ActorMaterializerSettings): 
(Processor[Any, Any], Any) = op match {
+    effectiveAttributes: Attributes,
+    effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) 
= op match {
     case DirectProcessor(processorFactory, _) => processorFactory()
-    case Identity(attr)                       => (new VirtualProcessor, ())
+    case Identity(attr) => (new VirtualProcessor, ())
     case _ =>
       val (opprops, mat) = 
ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
       ActorProcessorFactory[Any, Any](
             actorOf(opprops, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)) -> mat
   }
 
-private def materializeJunction(
+  private def materializeJunction(
     op: JunctionModule,
     effectiveAttributes: Attributes,
     effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
-  op match {
-    case fanin: FanInModule =>
-      val (props, inputs, output) = fanin match {
+    op match {
+      case fanin: FanInModule =>
+        val (props, inputs, output) = fanin match {
 
-        case MergeModule(shape, _) =>
-          (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, 
shape.out)
+          case MergeModule(shape, _) =>
+            (FairMerge.props(effectiveSettings, shape.inSeq.size), 
shape.inSeq, shape.out)
 
-        case f: FlexiMergeModule[_, Shape] =>
-          val flexi = f.flexi(f.shape)
-          val shape: Shape = f.shape
-          (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, 
shape.outlets.head)
+          case f: FlexiMergeModule[_, Shape] =>
+            val flexi = f.flexi(f.shape)
+            val shape: Shape = f.shape
+            (FlexiMerge.props(effectiveSettings, f.shape, flexi), 
shape.inlets, shape.outlets.head)
 
-        case MergePreferredModule(shape, _) =>
-          (UnfairMerge.props(effectiveSettings, shape.inlets.size), 
shape.preferred +: shape.inSeq, shape.out)
+          case MergePreferredModule(shape, _) =>
+            (UnfairMerge.props(effectiveSettings, shape.inlets.size), 
shape.preferred +: shape.inSeq, shape.out)
 
-        case ConcatModule(shape, _) =>
-          require(shape.inSeq.size == 2, "currently only supporting 
concatenation of exactly two inputs") // TODO
-          (Concat.props(effectiveSettings), shape.inSeq, shape.out)
+          case ConcatModule(shape, _) =>
+            require(shape.inSeq.size == 2, "currently only supporting 
concatenation of exactly two inputs") // TODO
+            (Concat.props(effectiveSettings), shape.inSeq, shape.out)
 
-        case zip: ZipWithModule =>
-          (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
-      }
+          case zip: ZipWithModule =>
+            (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
+        }
 
-      val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
-      val publisher = new ActorPublisher[Any](impl)
-      // Resolve cyclic dependency with actor. This MUST be the first message 
no matter what.
-      impl ! ExposedPublisher(publisher)
+        val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
+        val publisher = new ActorPublisher[Any](impl)
+        // Resolve cyclic dependency with actor. This MUST be the first 
message no matter what.
+        impl ! ExposedPublisher(publisher)
 
-      val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map{ 
pair =>
-        val (in, id) = pair
-        (in, FanIn.SubInput[Any](impl, id))
-      }.toMap
+        val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map 
{ pair =>
+          val (in, id) = pair
+          (in, FanIn.SubInput[Any](impl, id))
+        }.toMap
 
-      val outMapping = Map(output -> publisher)
-      MaterializedModule(fanin, (), inputMapping, outMapping)
+        val outMapping = Map(output -> publisher)
+        MaterializedModule(fanin, (), inputMapping, outMapping)
 
-    case fanout: FanOutModule =>
-      val (props, in, outs) = fanout match {
+      case fanout: FanOutModule =>
+        val (props, in, outs) = fanout match {
 
-        case r: FlexiRouteModule[t, Shape] =>
-          val flexi = r.flexi(r.shape)
-          val shape: Shape = r.shape
-          (FlexiRoute.props(effectiveSettings, r.shape, flexi), 
shape.inlets.head: InPort, r.shape.outlets)
+          case r: FlexiRouteModule[t, Shape] =>
+            val flexi = r.flexi(r.shape)
+            val shape: Shape = r.shape
+            (FlexiRoute.props(effectiveSettings, r.shape, flexi), 
shape.inlets.head: InPort, r.shape.outlets)
 
-        case BroadcastModule(shape, eagerCancel, _) =>
-          (Broadcast.props(effectiveSettings, eagerCancel, 
shape.outArray.size), shape.in, shape.outArray.toSeq)
+          case BroadcastModule(shape, eagerCancel, _) =>
+            (Broadcast.props(effectiveSettings, eagerCancel, 
shape.outArray.size), shape.in, shape.outArray.toSeq)
 
-        case BalanceModule(shape, waitForDownstreams, _) =>
-          (Balance.props(effectiveSettings, shape.outArray.size, 
waitForDownstreams), shape.in, shape.outArray.toSeq)
+          case BalanceModule(shape, waitForDownstreams, _) =>
+            (Balance.props(effectiveSettings, shape.outArray.size, 
waitForDownstreams), shape.in, shape.outArray.toSeq)
 
-        case unzip: UnzipWithModule =>
-          (unzip.props(effectiveSettings), unzip.inPorts.head, 
unzip.shape.outlets)
-      }
-      val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
-      val size = outs.size
-      def factory(id: Int) =
-        new ActorPublisher[Any](impl) { override val wakeUpMsg = 
FanOut.SubstreamSubscribePending(id) }
-      val publishers =
-        if (outs.size < 8) Vector.tabulate(size)(factory)
-        else List.tabulate(size)(factory)
-
-      impl ! FanOut.ExposedPublishers(publishers)
-      val outputs: Map[OutPort, Publisher[_]] = 
publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
-        (out, pub)
-      }.toMap
-
-      val inputs: Map[InPort, Subscriber[_]] = Map(in -> 
ActorSubscriber[Any](impl))
-      MaterializedModule(fanout, (), inputs, outputs)
+          case unzip: UnzipWithModule =>
+            (unzip.props(effectiveSettings), unzip.inPorts.head, 
unzip.shape.outlets)
+        }
+        val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
+        val size = outs.size
+        def factory(id: Int) =
+          new ActorPublisher[Any](impl) {
+            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+          }
+        val publishers =
+          if (outs.size < 8) Vector.tabulate(size)(factory)
+          else List.tabulate(size)(factory)
+
+        impl ! FanOut.ExposedPublishers(publishers)
+        val outputs: Map[OutPort, Publisher[_]] = 
publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
+          (out, pub)
+        }.toMap
+
+        val inputs: Map[InPort, Subscriber[_]] = Map(in -> 
ActorSubscriber[Any](impl))
+        MaterializedModule(fanout, (), inputs, outputs)
     }
   }
 
@@ -269,11 +268,11 @@ private def materializeJunction(
 }
 
 object LocalMaterializerImpl {
-  case class MaterializedModule(val module: Module, val matValue: Any, inputs: 
Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]] , outputs: 
Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+  case class MaterializedModule(val module: Module, val matValue: Any, inputs: 
Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: 
Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
 
   def toFoldModule(reduce: ReduceModule[Any]): Fold = {
     val f = reduce.f
-    val aggregator = {(zero: Any, input: Any) =>
+    val aggregator = { (zero: Any, input: Any) =>
       if (zero == null) {
         input
       } else {

Reply via email to