http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
index 4bec092..9852ed0 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,19 +21,18 @@ package akka.stream.gearpump.materializer
 import akka.actor.ActorSystem
 import akka.stream.ModuleGraph.Edge
 import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{ProcessorModule, ReduceModule, 
GroupByModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, 
SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, 
UnZip2Task, SinkBridgeTask, SourceBridgeTask}
-import akka.stream.impl.GenJunctions.{UnzipWith2Module, ZipWithModule}
-import akka.stream.impl.Junctions._
-import akka.stream.impl.{FlexiRouteImpl, Stages}
-import akka.stream.impl.Stages.{MaterializingStageFactory, StageModule}
+import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, 
ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, 
SourceTaskModule}
+import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, 
SinkBridgeTask, SourceBridgeTask, UnZip2Task}
+import akka.stream.impl.Stages
+import akka.stream.impl.Stages.StageModule
 import akka.stream.impl.StreamLayout.Module
+import org.slf4j.LoggerFactory
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.streaming.dsl.op.{GroupByOp, DataSinkOp, DataSourceOp, 
Direct, FlatMapOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
+import io.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, 
FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, 
SlaveOp}
 import io.gearpump.streaming.{ProcessorId, StreamApplication}
 import io.gearpump.util.Graph
-import org.slf4j.LoggerFactory
 
 /**
  * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a 
Gearpump
@@ -118,7 +117,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
   }
 
   private def cleanClues(app: StreamApplication): StreamApplication = {
-    val graph = app.dag.mapVertex{ processor =>
+    val graph = app.dag.mapVertex { processor =>
       val conf = cleanClue(processor.taskConf)
       processor.copy(taskConf = conf)
     }
@@ -126,7 +125,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
   }
 
   private def cleanClue(conf: UserConfig): UserConfig = {
-    conf.filter{kv =>
+    conf.filter { kv =>
       kv._2 != RemoteMaterializerImpl.STAINS
     }
   }
@@ -181,7 +180,6 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
     (opGraph, matValues)
   }
 
-
   private def translateStage(module: StageModule, conf: UserConfig): Op = {
     module match {
       case buffer: Stages.Buffer =>
@@ -252,10 +250,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
   }
 
   private def translateFanIn(
-      fanIn: FanInModule,
-      edges: List[(Module, Edge, Module)],
-      parallelism: Int,
-      conf: UserConfig): Op = {
+    fanIn: FanInModule,
+    edges: List[(Module, Edge, Module)],
+    parallelism: Int,
+    conf: UserConfig): Op = {
     fanIn match {
       case merge: MergeModule[_] =>
         MergeOp("merge", conf)
@@ -275,10 +273,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
   }
 
   private def translateFanOut(
-      fanOut: FanOutModule,
-      edges: List[(Module, Edge, Module)],
-      parallelism: Int,
-      conf: UserConfig): Op = {
+    fanOut: FanOutModule,
+    edges: List[(Module, Edge, Module)],
+    parallelism: Int,
+    conf: UserConfig): Op = {
     fanOut match {
       case unzip2: UnzipWith2Module[Any, Any, Any] =>
         val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new 
UnZip2Task.UnZipFunction(unzip2.f))
@@ -332,7 +330,7 @@ object RemoteMaterializerImpl {
   }
 
   def mapOp(map: Any => Any, conf: UserConfig): Op = {
-    flatMapOp ({ data: Any =>
+    flatMapOp({ data: Any =>
       List(map(data))
     }, "map", conf)
   }
@@ -346,8 +344,8 @@ object RemoteMaterializerImpl {
   }
 
   def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: 
UserConfig): Op = {
-    var agg : Any = null
-    val flatMap = {elem: Any =>
+    var agg: Any = null
+    val flatMap = { elem: Any =>
       agg = if (agg == null) {
         seed(elem)
       } else {
@@ -356,7 +354,7 @@ object RemoteMaterializerImpl {
       List(agg)
     }
 
-    flatMapOp (flatMap, "map", conf)
+    flatMapOp(flatMap, "map", conf)
   }
 
   def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
@@ -376,7 +374,7 @@ object RemoteMaterializerImpl {
       b
     }
 
-    val flatMap: Any=>Iterable[Any] = {input: Any =>
+    val flatMap: Any => Iterable[Any] = { input: Any =>
       buf += input
       left -= 1
       if (left == 0) {
@@ -393,7 +391,7 @@ object RemoteMaterializerImpl {
 
   def dropOp(number: Long, conf: UserConfig): Op = {
     var left = number
-    val flatMap: Any=>Iterable[Any] = {input: Any =>
+    val flatMap: Any => Iterable[Any] = { input: Any =>
       if (left > 0) {
         left -= 1
         None
@@ -404,14 +402,14 @@ object RemoteMaterializerImpl {
     flatMapOp(flatMap, "drop", conf)
   }
 
-  def dropWhileOp(drop: Any=>Boolean, conf: UserConfig): Op = {
+  def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
     flatMapOp({ data =>
-      if (drop(data))  None else Option(data)
+      if (drop(data)) None else Option(data)
     }, "dropWhile", conf)
   }
 
-  def logOp(name: String, extract: Any=>Any, conf: UserConfig): Op = {
-    val flatMap = {elem: Any =>
+  def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
+    val flatMap = { elem: Any =>
       LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
       List(elem)
     }
@@ -422,7 +420,7 @@ object RemoteMaterializerImpl {
     var aggregator = zero
     var pushedZero = false
 
-    val flatMap = {elem: Any =>
+    val flatMap = { elem: Any =>
       aggregator = f(aggregator, elem)
 
       if (pushedZero) {
@@ -438,7 +436,7 @@ object RemoteMaterializerImpl {
   def takeOp(count: Long, conf: UserConfig): Op = {
     var left: Long = count
 
-    val filter: Any=>Iterable[Any] = {elem: Any =>
+    val filter: Any => Iterable[Any] = { elem: Any =>
       left -= 1
       if (left > 0) Some(elem)
       else if (left == 0) Some(elem)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
index 8a154c3..c5dfc9a 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,10 +26,10 @@ import org.reactivestreams.{Publisher, Subscriber}
 /**
  *
  *
- *   [[IN]] -> [[BridgeModule]] -> [[OUT]]
- *                   /
- *                  /
- *       out of band data input or output channel [[MAT]]
+ * [[IN]] -> [[BridgeModule]] -> [[OUT]]
+ * /
+ * /
+ * out of band data input or output channel [[MAT]]
  *
  *
  * [[BridgeModule]] is used as a bridge between different materializers.
@@ -38,18 +38,18 @@ import org.reactivestreams.{Publisher, Subscriber}
  *
  * For example:
  *
- *                              Remote Materializer
- *                         -----------------------------
- *                         |                            |
- *                         | BridgeModule -> RemoteSink |
- *                         |  /                         |
- *                         --/----------------------------
- *   Local Materializer     /  out of band channel.
- *   ----------------------/----
- *   | Local              /    |
- *   | Source ->  BridgeModule |
- *   |                         |
- *   ---------------------------
+ * Remote Materializer
+ * -----------------------------
+ * |                            |
+ * | BridgeModule -> RemoteSink |
+ * |  /                         |
+ * --/----------------------------
+ * Local Materializer     /  out of band channel.
+ * ----------------------/----
+ * | Local              /    |
+ * | Source ->  BridgeModule |
+ * |                         |
+ * ---------------------------
  *
  *
  * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
@@ -64,7 +64,7 @@ import org.reactivestreams.{Publisher, Subscriber}
  * @tparam OUT
  * @tparam MAT
  */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT]{
+abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
   def attributes: Attributes
   def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
 
@@ -72,17 +72,16 @@ abstract class BridgeModule[IN, OUT, MAT] extends 
FlowModule[IN, OUT, MAT]{
   override def carbonCopy: Module = newInstance
 }
 
-
 /**
  *
  * Bridge module which accept out of band channel Input
  * [[org.reactivestreams.Publisher]][IN].
  *
  *
- *         [[SourceBridgeModule]] -> [[OUT]]
- *                   /|
- *                  /
- *       out of band data input [[org.reactivestreams.Publisher]][IN]
+ * [[SourceBridgeModule]] -> [[OUT]]
+ * /|
+ * /
+ * out of band data input [[org.reactivestreams.Publisher]][IN]
  *
  * @see [[BridgeModule]]
  *
@@ -94,7 +93,7 @@ class SourceBridgeModule[IN, OUT](val attributes: Attributes 
= Attributes.name("
   override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = 
new SourceBridgeModule[IN, OUT](attributes)
 
   override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Subscriber[IN]] = {
-    new SourceBridgeModule( attributes)
+    new SourceBridgeModule(attributes)
   }
 }
 
@@ -104,11 +103,11 @@ class SourceBridgeModule[IN, OUT](val attributes: 
Attributes = Attributes.name("
  * [[org.reactivestreams.Subscriber]][OUT].
  *
  *
- *   [[IN]] -> [[BridgeModule]]
- *                    \
- *                     \
- *                      \|
- *       out of band data output [[org.reactivestreams.Subscriber]][OUT]
+ * [[IN]] -> [[BridgeModule]]
+ * \
+ * \
+ * \|
+ * out of band data output [[org.reactivestreams.Subscriber]][OUT]
  *
  * @see [[BridgeModule]]
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
index fcda327..bc744f9 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -49,7 +49,6 @@ import org.reactivestreams.{Publisher, Subscriber}
  */
 trait DummyModule extends Module
 
-
 /**
  *
  *    [[DummySource]]-> [[BridgeModule]] -> Sink
@@ -77,7 +76,6 @@ class DummySource[Out](val attributes: Attributes, shape: 
SourceShape[Out])
   }
 }
 
-
 /**
  *
  *    Source-> [[BridgeModule]] -> [[DummySink]]
@@ -88,7 +86,7 @@ class DummySource[Out](val attributes: Attributes, shape: 
SourceShape[Out])
  *
  * @param attributes
  * @param shape
-  */
+ */
 class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
   extends SinkModule[IN, Unit](shape) with DummyModule {
   override def create(context: MaterializationContext): (Subscriber[IN], Unit) 
= {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
index d87d689..4b7d3ac 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ package akka.stream.gearpump.module
 import akka.stream.impl.FlowModule
 import akka.stream.impl.StreamLayout.Module
 import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.sink.DataSink
 import io.gearpump.streaming.source.DataSource
@@ -31,7 +32,7 @@ import io.gearpump.streaming.task.Task
  *
  * This is specially designed for Gearpump runtime. It is not supposed to be 
used
  * for local materializer.
- * 
+ *
  */
 trait GearpumpTaskModule extends Module
 
@@ -44,15 +45,19 @@ trait GearpumpTaskModule extends Module
  * @tparam T
  */
 final case class SourceTaskModule[T](
-   source: DataSource,
-   conf: UserConfig,
-   shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
-   attributes: Attributes = Attributes.name("SourceTaskModule"))
+    source: DataSource,
+    conf: UserConfig,
+    shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
+    attributes: Attributes = Attributes.name("SourceTaskModule"))
   extends GearpumpTaskModule {
 
   override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = this.copy(shape = 
amendShape(attr), attributes = attr)
-  override def carbonCopy: Module = this.copy(shape = 
SourceShape(Outlet[T]("SourceTaskModule.out")))
+  override def withAttributes(attr: Attributes): Module = {
+    this.copy(shape = amendShape(attr), attributes = attr)
+  }
+  override def carbonCopy: Module = {
+    this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
+  }
 
   override def replaceShape(s: Shape): Module =
     if (s == shape) this
@@ -83,7 +88,9 @@ final case class SinkTaskModule[IN](
   extends GearpumpTaskModule {
 
   override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = this.copy(shape = 
amendShape(attr), attributes = attr)
+  override def withAttributes(attr: Attributes): Module = {
+    this.copy(shape = amendShape(attr), attributes = attr)
+  }
   override def carbonCopy: Module = this.copy(shape = 
SinkShape(Inlet[IN]("SinkTaskModule.in")))
 
   override def replaceShape(s: Shape): Module =
@@ -116,9 +123,11 @@ case class ProcessorModule[IN, OUT, Unit](
 
   override def carbonCopy: Module = newInstance
 
-  protected def newInstance: ProcessorModule[IN,OUT, Unit] = new 
ProcessorModule[IN,OUT, Unit](processor, conf, attributes)
+  protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
+  }
 
-  override def withAttributes(attributes: Attributes): ProcessorModule[IN,OUT, 
Unit] = {
-    new ProcessorModule[IN,OUT, Unit](processor, conf, attributes)
+  override def withAttributes(attributes: Attributes): ProcessorModule[IN, 
OUT, Unit] = {
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
index 6871e71..e57a6f6 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,6 @@ import akka.stream.Attributes
 import akka.stream.impl.FlowModule
 import akka.stream.impl.StreamLayout.Module
 
-
 /**
  *
  * Group the T value groupBy function
@@ -32,11 +31,14 @@ import akka.stream.impl.StreamLayout.Module
  * @tparam T
  * @tparam Group
  */
-case class GroupByModule[T, Group](val groupBy: T => Group, val attributes: 
Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit]{
+case class GroupByModule[T, Group](val groupBy: T => Group,
+    val attributes: Attributes = Attributes.name("groupByModule")) extends 
FlowModule[T, T, Unit] {
 
   override def carbonCopy: Module = newInstance
 
-  protected def newInstance: GroupByModule[T, Group] = new GroupByModule[T, 
Group](groupBy, attributes)
+  protected def newInstance: GroupByModule[T, Group] = {
+    new GroupByModule[T, Group](groupBy, attributes)
+  }
 
   override def withAttributes(attributes: Attributes): GroupByModule[T, Group] 
= {
     new GroupByModule[T, Group](groupBy, attributes)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
index e28fbfc..926feb6 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,6 @@ import akka.stream.Attributes
 import akka.stream.impl.FlowModule
 import akka.stream.impl.StreamLayout.Module
 
-
 /**
  *
  * Reduce Module
@@ -31,7 +30,9 @@ import akka.stream.impl.StreamLayout.Module
  * @param attributes
  * @tparam T
  */
-case class ReduceModule[T](val f: (T, T) => T, val attributes: Attributes = 
Attributes.name("reduceModule")) extends FlowModule[T, T, Unit]{
+case class ReduceModule[T](
+    val f: (T, T) => T, val attributes: Attributes = 
Attributes.name("reduceModule"))
+  extends FlowModule[T, T, Unit] {
 
   override def carbonCopy: Module = newInstance
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
index df032c8..a9f6e97 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,17 @@
 
 package akka.stream.gearpump.scaladsl
 
-import akka.stream.{FlowShape, Graph, Attributes}
-import akka.stream.gearpump.module.{ProcessorModule, ReduceModule, 
GroupByModule, DummySink, DummySource, SinkBridgeModule, SinkTaskModule, 
SourceBridgeModule, SourceTaskModule}
-import akka.stream.impl.Stages.Map
-import akka.stream.scaladsl.{FlowOps, Flow, Keep, Sink, Source}
+import akka.stream.Attributes
+import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, 
ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, 
SourceBridgeModule, SourceTaskModule}
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import org.reactivestreams.{Publisher, Subscriber}
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.sink.DataSink
 import io.gearpump.streaming.source.DataSource
 import io.gearpump.streaming.task.Task
-import org.reactivestreams.{Publisher, Subscriber}
-
 
-object GearSource{
+object GearSource {
 
   /**
    * Construct a Source which accepts out of band input messages.
@@ -53,7 +52,7 @@ object GearSource{
   /**
    * Construct a Source from Gearpump [[DataSource]].
    *
-   *    [[SourceTaskModule]] -> downstream Sink
+   * [[SourceTaskModule]] -> downstream Sink
    *
    */
   def from[OUT](source: DataSource): Source[OUT, Unit] = {
@@ -64,7 +63,7 @@ object GearSource{
   /**
    * Construct a Source from Gearpump [[io.gearpump.streaming.Processor]].
    *
-   *    [[ProcessorModule]] -> downstream Sink
+   * [[ProcessorModule]] -> downstream Sink
    *
    */
   def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, 
Unit] = {
@@ -98,7 +97,7 @@ object GearSink {
   /**
    * Construct a Sink from Gearpump [[DataSink]].
    *
-   *    Upstream Source -> [[SinkTaskModule]]
+   * Upstream Source -> [[SinkTaskModule]]
    *
    */
   def to[IN](sink: DataSink): Sink[IN, Unit] = {
@@ -109,7 +108,7 @@ object GearSink {
   /**
    * Construct a Sink from Gearpump [[io.gearpump.streaming.Processor]].
    *
-   *    Upstream Source -> [[ProcessorModule]]
+   * Upstream Source -> [[ProcessorModule]]
    *
    */
   def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
@@ -147,8 +146,8 @@ object GearSink {
  * sink will only operate on the main stream.
  *
  */
-object GroupBy{
-  def apply[T, Group](groupBy: T=>Group): Flow[T, T, Unit] = {
+object GroupBy {
+  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
     new Flow[T, T, Unit](new GroupByModule(groupBy))
   }
 }
@@ -160,19 +159,18 @@ object GroupBy{
  *
  *
  */
-object Reduce{
+object Reduce {
   def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
     new Flow[T, T, Unit](new ReduceModule(reduce))
   }
 }
 
-
 /**
  * Create a Flow by providing a Gearpump Processor class and configuration
  *
  *
  */
-object Processor{
+object Processor {
   def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, 
Out, Unit] = {
     new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, 
conf))
   }
@@ -193,7 +191,6 @@ object Implicits {
       source.via[T, Unit](stage)
     }
 
-
     def reduce(reduce: (T, T) => T): Source[T, Mat] = {
       val stage = Reduce.apply(reduce)
       source.via[T, Unit](stage)
@@ -240,13 +237,10 @@ object Implicits {
     }
 
     /**
-     * do sum on values
+     * Does sum on values
      *
      * Before doing this, you need to do groupByKey to group same key together
      * , otherwise, it will do the sum no matter what current key is.
-     *
-     * @param numeric
-     * @return
      */
     def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
       val stage = Reduce.apply(sumByValue[K, V](numeric))
@@ -255,13 +249,13 @@ object Implicits {
   }
 
   /**
-   * Help util to support groupByKey and sum
+   * Helper util to support groupByKey and sum
    */
   implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
 
     /**
-     * if it is a KV Pair, we can group the KV pair by the key.
-     * @return
+     * If it is a KV Pair, we can group the KV pair by the key.
+     *
      */
     def groupByKey: Flow[(K, V), (K, V), Mat] = {
       val stage = GroupBy.apply(getTupleKey[K, V])
@@ -274,8 +268,6 @@ object Implicits {
      * Before doing this, you need to do groupByKey to group same key together
      * , otherwise, it will do the sum no matter what current key is.
      *
-     * @param numeric
-     * @return
      */
     def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
       val stage = Reduce.apply(sumByValue[K, V](numeric))
@@ -285,6 +277,6 @@ object Implicits {
 
   private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
 
-  private def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, 
V]) => Tuple2[K, V]
-    = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+  private def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, 
V]) => Tuple2[K, V] =
+    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
index e41b771..58a04ca 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +22,12 @@ import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf : UserConfig) extends 
GraphTask(context, userConf) {
+class BalanceTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
 
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
index b61813c..388806e 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,8 +22,8 @@ import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.task.TaskContext
 
-class BroadcastTask(context: TaskContext, userConf : UserConfig) extends 
GraphTask(context, userConf) {
-  override def onNext(msg : Message) : Unit = {
+class BroadcastTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
+  override def onNext(msg: Message): Unit = {
     context.output(msg)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
index e5243f9..d3f483d 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,29 +19,32 @@
 package akka.stream.gearpump.task
 
 import akka.stream.gearpump.task.GraphTask.{Index, PortId}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
 
-
-class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig) extends 
Task(inputTaskContext, userConf) {
+class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
+  extends Task(inputTaskContext, userConf) {
 
   private val context = inputTaskContext.asInstanceOf[TaskWrapper]
-  private val outMapping = 
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get)
-  private val inMapping = 
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get)
+  private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
+    GraphTask.OUT_PROCESSORS).get)
+  private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
+    GraphTask.IN_PROCESSORS).get)
 
   val sizeOfOutPorts = outMapping.keys.size
   val sizeOfInPorts = inMapping.keys.size
-  
+
   private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] 
= {
-    val portToProcessor = processors.zipWithIndex.map{kv =>
+    val portToProcessor = processors.zipWithIndex.map { kv =>
       (kv._2, kv._1)
     }.toMap
 
     val processorToIndex = processors.sorted.zipWithIndex.toMap
 
-    val portToIndex = portToProcessor.map{kv =>
+    val portToIndex = portToProcessor.map { kv =>
       val (outlet, processorId) = kv
       val index = processorToIndex(processorId)
       (outlet, index)
@@ -53,9 +56,9 @@ class GraphTask(inputTaskContext : TaskContext, userConf : 
UserConfig) extends T
     context.output(outMapping(outletId), msg)
   }
 
-  override def onStart(startTime : StartTime) : Unit = {}
+  override def onStart(startTime: StartTime): Unit = {}
 
-  override def onStop() : Unit = {}
+  override def onStop(): Unit = {}
 }
 
 object GraphTask {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
index 5a9a5b6..d7bacd5 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,6 +25,8 @@ import akka.actor.Actor.Receive
 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
 import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
 import akka.util.Timeout
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
@@ -32,9 +34,6 @@ import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
 import io.gearpump.util.LogUtil
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
-import scala.concurrent.ExecutionContext
 
 /**
  * Bridge Task when data flow is from remote Gearpump Task to local 
Akka-Stream Module
@@ -47,11 +46,8 @@ import scala.concurrent.ExecutionContext
  *                            \|
  *                       Akka Stream [[Subscriber]]
  *
- *
- * @param taskContext
- * @param userConf
  */
-class SinkBridgeTask (taskContext : TaskContext, userConf : UserConfig) 
extends Task(taskContext, userConf) {
+class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
   import taskContext.taskId
 
   val queue = new util.LinkedList[Message]()
@@ -59,14 +55,14 @@ class SinkBridgeTask (taskContext : TaskContext, userConf : 
UserConfig) extends
 
   var request: Int = 0
 
-  override def onStart(startTime : StartTime) : Unit = {}
+  override def onStart(startTime: StartTime): Unit = {}
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     queue.add(msg)
     trySendingData()
   }
 
-  override def onStop() : Unit = {}
+  override def onStop(): Unit = {}
 
   private def trySendingData(): Unit = {
     if (subscriber != null) {
@@ -100,7 +96,7 @@ object SinkBridgeTask {
     private var actor: ActorRef = null
     import system.dispatcher
 
-    private val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map{container =>
+    private val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map { container =>
       // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
       container.task
     }
@@ -115,7 +111,7 @@ object SinkBridgeTask {
     private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
 
     override def request(l: Long): Unit = {
-      task.foreach{ task =>
+      task.foreach { task =>
         task.tell(RequestMessage(l.toInt), actor)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
index fae422b..b433a7f 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,18 @@
 
 package akka.stream.gearpump.task
 
+import scala.concurrent.ExecutionContext
+
 import akka.actor.Actor.Receive
-import akka.actor.ActorSystem
 import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, 
Complete, Error}
+import org.reactivestreams.{Subscriber, Subscription}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.reactivestreams.{Subscriber, Subscription}
-
-import scala.concurrent.ExecutionContext
 
 /**
  * Bridge Task when data flow is from local Akka-Stream Module to remote 
Gearpump Task
@@ -42,20 +42,17 @@ import scala.concurrent.ExecutionContext
  *               /                    Local JVM
  *    Akka Stream [[org.reactivestreams.Publisher]]
  *
- *
- * @param taskContext
- * @param userConf
  */
-class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) 
extends Task(taskContext, userConf) {
+class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
   import taskContext.taskId
 
-  override def onStart(startTime : StartTime) : Unit = {}
+  override def onStart(startTime: StartTime): Unit = {}
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     LOG.info("AkkaStreamSource receiving message " + msg)
   }
 
-  override def onStop() : Unit = {}
+  override def onStop(): Unit = {}
 
   override def receiveUnManagedMessage: Receive = {
     case Error(ex) =>
@@ -70,7 +67,6 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : 
UserConfig) extends
   }
 }
 
-
 object SourceBridgeTask {
   case class Error(ex: java.lang.Throwable)
 
@@ -83,7 +79,7 @@ object SourceBridgeTask {
     var subscription: Subscription = null
     implicit val dispatcher = ec
 
-    val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map{container =>
+    val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map { container =>
       // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
       container.task
     }
@@ -93,7 +89,6 @@ object SourceBridgeTask {
     }
 
     override def onSubscribe(subscription: Subscription): Unit = {
-      // when taskActorRef is resolved, request message from upstream
       this.subscription = subscription
       task.map(task => subscription.request(1))
     }
@@ -103,7 +98,7 @@ object SourceBridgeTask {
     }
 
     override def onNext(t: T): Unit = {
-      task.map{task =>
+      task.map { task =>
         task ! AkkaStreamMessage(t)
       }
       subscription.request(1)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
index 12470df..0b3b9a5 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,15 +19,16 @@
 package akka.stream.gearpump.task
 
 import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.task.TaskContext
 
-class UnZip2Task(context: TaskContext, userConf : UserConfig) extends 
GraphTask(context, userConf) {
+class UnZip2Task(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
 
   val unzip = 
userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     val message = msg.msg
     val time = msg.timestamp
     val pair = unzip(message)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
index 616a279..c774fc7 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,14 +25,14 @@ class MaterializedValueOps(mat: MaterializedValueNode) {
     def resolveMaterialized(mat: MaterializedValueNode, materializedValues: 
Map[Module, Any]): Any = mat match {
       case Atomic(m) => materializedValues.getOrElse(m, ())
       case Combine(f, d1, d2) => f(resolveMaterialized(d1, 
materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d)    => f(resolveMaterialized(d, materializedValues))
-      case Ignore             => ()
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => ()
     }
     resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
   }
 }
 
-object MaterializedValueOps{
+object MaterializedValueOps {
   def apply(mat: MaterializedValueNode): MaterializedValueOps = new 
MaterializedValueOps(mat)
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
 
b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
index 97a52bf..4ead839 100644
--- 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
+++ 
b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,7 @@
 package akka.stream.gearpump
 
 import akka.stream.Attributes
-import org.scalatest.{FlatSpec, Matchers, WordSpec}
+import org.scalatest.{FlatSpec, Matchers}
 
 class AttributesSpec extends FlatSpec with Matchers {
   it should "merge the attributes together" in {
@@ -30,5 +30,4 @@ class AttributesSpec extends FlatSpec with Matchers {
 
     assert("aa-bb" == c.nameOrDefault())
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/README.md
----------------------------------------------------------------------
diff --git a/experiments/cgroup/README.md b/experiments/cgroup/README.md
new file mode 100644
index 0000000..ca839cd
--- /dev/null
+++ b/experiments/cgroup/README.md
@@ -0,0 +1 @@
+Please see http://gearpump.io for documentation on Cgroup.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
index 9c7151e..973ad03 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,5 +65,4 @@ public class CGroupResource {
   public void setEnable(boolean enable) {
     this.enable = enable;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
index 1ec7714..dc889ba 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -161,7 +161,6 @@ public class CgroupCenter implements CgroupOperation {
       new File(hierarchy.getDir()).mkdirs();
     String subSystems = CgroupUtils.reAnalyse(resourceTypes);
     SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", 
subSystems);
-
   }
 
   @Override
@@ -210,5 +209,4 @@ public class CgroupCenter implements CgroupOperation {
   public static void main(String args[]) {
     
System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
index 1c644e0..5414814 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -217,5 +217,4 @@ public class CgroupCommon implements CgroupCommonOperation {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
index e4620c5..7465645 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,5 +43,4 @@ public interface CgroupCommonOperation {
   public boolean getCgroupCloneChildren() throws IOException;
 
   public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
index 139ca80..a719f91 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,8 @@
  */
 package io.gearpump.cluster.cgroup;
 
-import io.gearpump.cluster.cgroup.core.*;
+import io.gearpump.cluster.cgroup.core.CgroupCore;
+import io.gearpump.cluster.cgroup.core.CpuCore;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,5 +39,4 @@ public class CgroupCoreFactory {
     }
     return result;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
index 67f48f3..a3d830a 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,5 +40,4 @@ public interface CgroupOperation {
   public void create(CgroupCommon cgroup) throws SecurityException;
 
   public void delete(CgroupCommon cgroup) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
index fe30b23..0a7f97c 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -129,7 +129,6 @@ public class CgroupUtils {
       bw.write(string);
       bw.newLine();
       bw.flush();
-
     } finally {
       CgroupUtils.close(writer, bw);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
index 1b8ecbb..80e12be 100644
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
+++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,5 +26,4 @@ public class Constants {
   public static String getDir(String dir, String constant) {
     return dir + constant;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
index e904aeb..69ec1ed 100644
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
+++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
index c063482..c2a1d42 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
index ae90b74..23e630c 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,5 +22,4 @@ import io.gearpump.cluster.cgroup.ResourceType;
 public interface CgroupCore {
 
   public ResourceType getType();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
index 8578590..3402d5a 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@ import io.gearpump.cluster.cgroup.Constants;
 import io.gearpump.cluster.cgroup.ResourceType;
 
 import java.io.IOException;
-import java.util.List;
 
 public class CpuCore implements CgroupCore {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
index 9117160..0772133 100644
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
+++ 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,41 +25,39 @@ import java.io.IOException;
 
 public class SystemOperation {
 
-    public static final Logger LOG = 
LoggerFactory.getLogger(SystemOperation.class);
-
-    public static void mount(String name, String target, String type, String 
data) throws IOException {
-        StringBuilder sb = new StringBuilder();
-        sb.append("mount -t ").append(type).append(" -o 
").append(data).append(" ").append(name).append(" ").append(target);
-        SystemOperation.exec(sb.toString());
-    }
-
-    public static void umount(String name) throws IOException {
-        StringBuilder sb = new StringBuilder();
-        sb.append("umount ").append(name);
-        SystemOperation.exec(sb.toString());
-    }
-
-    public static String exec(String cmd) throws IOException {
-        LOG.debug("Shell cmd: " + cmd);
-        Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", 
cmd }).start();
-        try {
-            process.waitFor();
-            String output = IOUtils.toString(process.getInputStream());
-            String errorOutput = IOUtils.toString(process.getErrorStream());
-            LOG.debug("Shell Output: " + output);
-            if (errorOutput.length() != 0) {
-                LOG.error("Shell Error Output: " + errorOutput);
-                throw new IOException(errorOutput);
-            }
-            return output;
-        } catch (InterruptedException ie) {
-            throw new IOException(ie.toString());
-        }
-
-    }
-
-    public static void main(String[] args) throws IOException {
-        SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
+  public static final Logger LOG = 
LoggerFactory.getLogger(SystemOperation.class);
+
+  public static void mount(String name, String target, String type, String 
data) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("mount -t ").append(type).append(" -o ").append(data).append(" 
").append(name).append(" ").append(target);
+    SystemOperation.exec(sb.toString());
+  }
+
+  public static void umount(String name) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("umount ").append(name);
+    SystemOperation.exec(sb.toString());
+  }
+
+  public static String exec(String cmd) throws IOException {
+    LOG.debug("Shell cmd: " + cmd);
+    Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", 
cmd}).start();
+    try {
+      process.waitFor();
+      String output = IOUtils.toString(process.getInputStream());
+      String errorOutput = IOUtils.toString(process.getErrorStream());
+      LOG.debug("Shell Output: " + output);
+      if (errorOutput.length() != 0) {
+        LOG.error("Shell Error Output: " + errorOutput);
+        throw new IOException(errorOutput);
+      }
+      return output;
+    } catch (InterruptedException ie) {
+      throw new IOException(ie.toString());
     }
+  }
 
+  public static void main(String[] args) throws IOException {
+    SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
 
b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
index a01d17a..ae7fb42 100644
--- 
a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
+++ 
b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 package io.gearpump.cluster.worker
 
 import com.typesafe.config.Config
+import org.apache.commons.lang.SystemUtils
+import org.slf4j.{Logger, LoggerFactory}
+
 import io.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore}
 import io.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, 
ResourceType}
 import io.gearpump.cluster.worker.CGroupManager._
-import org.apache.commons.lang.SystemUtils
-import org.slf4j.{LoggerFactory, Logger}
 
 class CGroupManager(config: Config) {
   private val center = CgroupCenter.getInstance()
@@ -49,12 +50,13 @@ class CGroupManager(config: Config) {
   }
 
   private def validateCpuUpperLimitValue(value: Int): Int = {
-    if(value > 10)
+    if (value > 10) {
       10
-    else if(value < 1 && value != -1)
+    } else if (value < 1 && value != -1) {
       1
-    else
+    } else {
       value
+    }
   }
 
   private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): 
Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
 
b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
index 9e7ce7b..eb57a18 100644
--- 
a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
+++ 
b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,42 +18,46 @@
 package io.gearpump.cluster.worker
 
 import java.io.File
+import scala.sys.process.Process
 
 import com.typesafe.config.Config
+import org.slf4j.{Logger, LoggerFactory}
+
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.util.{ProcessLogRedirector, RichProcess}
-import org.slf4j.{LoggerFactory, Logger}
-
-import scala.sys.process.Process
 
 /**
-  * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
-  * For more details, please refer http://gearpump.io
-  */
-class CGroupProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher{
+ * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
+ * For more details, please refer http://gearpump.io
+ */
+class CGroupProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher {
   private val APP_MASTER = -1
   private val cgroupManager: Option[CGroupManager] = 
CGroupManager.getInstance(config)
   private val LOG: Logger = LoggerFactory.getLogger(getClass)
 
   override def cleanProcess(appId: Int, executorId: Int): Unit = {
-    if(executorId != APP_MASTER) {
+    if (executorId != APP_MASTER) {
       cgroupManager.foreach(_.shutDownExecutor(appId, executorId))
     }
   }
 
-  override def createProcess(appId: Int, executorId: Int, resource: Resource, 
appConfig: Config, options: Array[String],
+  override def createProcess(
+    appId: Int, executorId: Int, resource: Resource, appConfig: Config, 
options: Array[String],
     classPath: Array[String], mainClass: String, arguments: Array[String]): 
RichProcess = {
     val cgroupCommand = if (executorId != APP_MASTER) {
-      cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, 
executorId)).getOrElse(List.empty)
+      cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
+        executorId)).getOrElse(List.empty)
     } else List.empty
-    LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, 
classpath: ${classPath.mkString(File.pathSeparator)}")
+    LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
+      s"classpath: ${classPath.mkString(File.pathSeparator)}")
 
     val java = System.getProperty("java.home") + "/bin/java"
-    val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", 
classPath.mkString(File.pathSeparator), mainClass) ++ arguments
-    LOG.info(s"Starting executor process java $mainClass 
${arguments.mkString(" ")}; options: ${options.mkString(" ")}")
+    val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", 
classPath
+      .mkString(File.pathSeparator), mainClass) ++ arguments
+    LOG.info(s"Starting executor process java $mainClass 
${arguments.mkString(" ")}; " +
+      s"options: ${options.mkString(" ")}")
     val logger = new ProcessLogRedirector()
     val process = Process(command).run(logger)
     new RichProcess(process, logger)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/README.md
----------------------------------------------------------------------
diff --git a/experiments/storm/README.md b/experiments/storm/README.md
index f42547c..d02bc6d 100644
--- a/experiments/storm/README.md
+++ b/experiments/storm/README.md
@@ -3,14 +3,14 @@ on Gearpump. This documentation illustrates how to do so in a 
local Gearpump clu
 
 ## How to run a Storm application over Gearpump 
 
-  1. launch a local cluster
+  1. Launch a local cluster
 
   ```bash
     ./target/pack/bin/local
 
   ```
 
-  2. submit a topology from storm-starter
+  2. Submit a topology from storm-starter
 
   ```bash
     bin/storm -verbose -config storm.yaml -jar 
storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology 
exclamation 
@@ -27,4 +27,3 @@ on Gearpump. This documentation illustrates how to do so in a 
local Gearpump clu
 
 1. Trident support is ongoing. 
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
 
b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
new file mode 100644
index 0000000..510258d
--- /dev/null
+++ 
b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm.util;
+
+import backtype.storm.utils.TimeCacheMap;
+
+/**
+ * Wrapper class to suppress "deprecation" warning, as scala doesn't support 
the suppression.
+ */
+@SuppressWarnings("deprecation")
+public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> {
+
+  public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) {
+    super(expirationSecs, new ExpiredCallback<K, V>() {
+
+      @Override
+      public void expire(K key, V val) {
+        callback.expire(key, val);
+      }
+    });
+  }
+
+  public static interface Callback<K, V> {
+    public void expire(K key, V val);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
index daa7aa5..19814e9 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,31 +18,34 @@
 
 package io.gearpump.experiments.storm
 
+import org.slf4j.Logger
+
 import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
 
 object StormRunner {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> 
GearpumpStormClient)
 
-  private def usage: Unit = {
+  private def usage(): Unit = {
     val keys = commands.keys.toList.sorted
+    // scalastyle:off println
     Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
   }
 
-  private def executeCommand(command : String, commandArgs : Array[String]): 
Unit = {
+  private def executeCommand(command: String, commandArgs: Array[String]): 
Unit = {
     if (!commands.contains(command)) {
-      usage
+      usage()
     } else {
       commands(command).main(commandArgs)
     }
   }
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     if (args.length == 0) {
-      usage
+      usage()
     } else {
       val command = args(0)
       val commandArgs = args.drop(1)

Reply via email to