http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
deleted file mode 100644
index f23604a..0000000
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.embedded.EmbeddedCluster
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Same WordCount with low level Processor Graph syntax */
-object WordCount extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  val RUN_FOR_EVER = -1
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
-    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
-    "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
-    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false,
-      defaultValue = Some(30))
-  )
-
-  def application(config: ParseResult): StreamApplication = {
-    val splitNum = config.getInt("split")
-    val sumNum = config.getInt("sum")
-    val split = Processor[Split](splitNum)
-    val sum = Processor[Sum](sumNum)
-    val partitioner = new HashPartitioner
-
-    val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> 
sum), UserConfig.empty)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    val debugMode = config.getBoolean("debug")
-    val sleepSeconds = config.getInt("sleep")
-
-    val localCluster = if (debugMode) {
-      val cluster = new EmbeddedCluster(akkaConf: Config)
-      cluster.start()
-      Some(cluster)
-    } else {
-      None
-    }
-
-    val context: ClientContext = localCluster match {
-      case Some(local) => local.newClientContext
-      case None => ClientContext(akkaConf)
-    }
-
-    val app = application(config)
-    context.submit(app)
-
-    if (debugMode) {
-      Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
-    }
-
-    context.close()
-    localCluster.map(_.stop())
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
deleted file mode 100644
index ab8e8d0..0000000
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount.dsl
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.streaming.dsl.StreamApp._
-import io.gearpump.util.AkkaApp
-
-/** Same WordCount with High level DSL syntax */
-object WordCount extends AkkaApp with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array.empty
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val context = ClientContext(akkaConf)
-    val app = StreamApp("dsl", context)
-    val data = "This is a good start, bingo!! bingo!!"
-    app.source(data.lines.toList, 1, "source").
-      // word => (word, count)
-      flatMap(line => line.split("[\\s]+")).map((_, 1)).
-      // (word, count1), (word, count2) => (word, count1 + count2)
-      groupByKey().sum.log
-
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
new file mode 100644
index 0000000..ae63f10
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Split(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import taskContext.output
+
+  override def onStart(startTime: StartTime): Unit = {
+    self ! Message("start")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    Split.TEXT_TO_SPLIT.lines.foreach { line =>
+      line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
+        output(new Message(msg, System.currentTimeMillis()))
+      }
+    }
+
+    import scala.concurrent.duration._
+    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
+      Message("continue", System.currentTimeMillis()))
+  }
+}
+
+object Split {
+  val TEXT_TO_SPLIT =
+    """
+      |   Licensed to the Apache Software Foundation (ASF) under one
+      |   or more contributor license agreements.  See the NOTICE file
+      |   distributed with this work for additional information
+      |   regarding copyright ownership.  The ASF licenses this file
+      |   to you under the Apache License, Version 2.0 (the
+      |   "License"); you may not use this file except in compliance
+      |   with the License.  You may obtain a copy of the License at
+      |
+      |       http://www.apache.org/licenses/LICENSE-2.0
+      |
+      |   Unless required by applicable law or agreed to in writing, software
+      |   distributed under the License is distributed on an "AS IS" BASIS,
+      |   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+      |   See the License for the specific language governing permissions and
+      |   limitations under the License.
+    """.stripMargin
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
new file mode 100644
index 0000000..c3fa82a
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Cancellable
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Sum(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  private[wordcount] val map: mutable.HashMap[String, Long] = new 
mutable.HashMap[String, Long]()
+
+  private[wordcount] var wordCount: Long = 0
+  private var snapShotTime: Long = System.currentTimeMillis()
+  private var snapShotWordCount: Long = 0
+
+  private var scheduler: Cancellable = null
+
+  override def onStart(startTime: StartTime): Unit = {
+    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+      new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
+  }
+
+  override def onNext(msg: Message): Unit = {
+    if (null != msg) {
+      val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
+      wordCount += 1
+      map.put(msg.msg.asInstanceOf[String], current + 1)
+    }
+  }
+
+  override def onStop(): Unit = {
+    if (scheduler != null) {
+      scheduler.cancel()
+    }
+  }
+
+  def reportWordCount(): Unit = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task ${taskContext.taskId} Throughput:" +
+      s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} 
(words, second)")
+    snapShotWordCount = wordCount
+    snapShotTime = current
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
new file mode 100644
index 0000000..9917d9f
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+/** Same WordCount with low level Processor Graph syntax */
+object WordCount extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val RUN_FOR_EVER = -1
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
+    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
+    "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
+    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false,
+      defaultValue = Some(30))
+  )
+
+  def application(config: ParseResult): StreamApplication = {
+    val splitNum = config.getInt("split")
+    val sumNum = config.getInt("sum")
+    val split = Processor[Split](splitNum)
+    val sum = Processor[Sum](sumNum)
+    val partitioner = new HashPartitioner
+
+    val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> 
sum), UserConfig.empty)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    val debugMode = config.getBoolean("debug")
+    val sleepSeconds = config.getInt("sleep")
+
+    val localCluster = if (debugMode) {
+      val cluster = new EmbeddedCluster(akkaConf: Config)
+      cluster.start()
+      Some(cluster)
+    } else {
+      None
+    }
+
+    val context: ClientContext = localCluster match {
+      case Some(local) => local.newClientContext
+      case None => ClientContext(akkaConf)
+    }
+
+    val app = application(config)
+    context.submit(app)
+
+    if (debugMode) {
+      Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
+    }
+
+    context.close()
+    localCluster.map(_.stop())
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
new file mode 100644
index 0000000..22f597c
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.util.AkkaApp
+
+/** Same WordCount with High level DSL syntax */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+    val app = StreamApp("dsl", context)
+    val data = "This is a good start, bingo!! bingo!!"
+    app.source(data.lines.toList, 1, "source").
+      // word => (word, count)
+      flatMap(line => line.split("[\\s]+")).map((_, 1)).
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupByKey().sum.log
+
+    val appId = context.submit(app)
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
deleted file mode 100644
index 21e498e..0000000
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.wordcount
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SplitSpec extends WordSpec with Matchers {
-
-  "Split" should {
-    "split the text and deliver to next task" in {
-
-      val taskContext = MockUtil.mockTaskContext
-
-      implicit val system: ActorSystem = ActorSystem("test", 
TestUtil.DEFAULT_CONFIG)
-
-      val mockTaskActor = TestProbe()
-
-      // Mock self ActorRef
-      when(taskContext.self).thenReturn(mockTaskActor.ref)
-
-      val conf = UserConfig.empty
-      val split = new Split(taskContext, conf)
-      split.onStart(StartTime(0))
-      mockTaskActor.expectMsgType[Message]
-
-      val expectedWordCount = Split.TEXT_TO_SPLIT.split( 
"""[\s\n]+""").filter(_.nonEmpty).length
-
-      split.onNext(Message("next"))
-      verify(taskContext, times(expectedWordCount)).output(anyObject())
-
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
deleted file mode 100644
index 48a3fa9..0000000
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.wordcount
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SumSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfter {
-  val stringGenerator = Gen.alphaStr
-
-  var wordcount = 0
-
-  property("Sum should calculate the frequency of the word correctly") {
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val conf = UserConfig.empty
-
-    val sum = new Sum(taskContext, conf)
-
-    sum.onStart(StartTime(0))
-
-    forAll(stringGenerator) { txt =>
-      wordcount += 1
-      sum.onNext(Message(txt))
-    }
-    val all = sum.map.foldLeft(0L) { (total, kv) =>
-      val (_, num) = kv
-      total + num
-    }
-    assert(sum.wordCount == all && sum.wordCount == wordcount)
-
-    sum.reportWordCount()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
deleted file mode 100644
index 63b2312..0000000
--- 
a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class WordCountSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("WordCount should succeed to submit application with required 
arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-split", "1",
-      "-sum", "1")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
-
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        WordCount.main(masterConfig, args)
-      }
-
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
new file mode 100644
index 0000000..cef9337
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SplitSpec extends WordSpec with Matchers {
+
+  "Split" should {
+    "split the text and deliver to next task" in {
+
+      val taskContext = MockUtil.mockTaskContext
+
+      implicit val system: ActorSystem = ActorSystem("test", 
TestUtil.DEFAULT_CONFIG)
+
+      val mockTaskActor = TestProbe()
+
+      // Mock self ActorRef
+      when(taskContext.self).thenReturn(mockTaskActor.ref)
+
+      val conf = UserConfig.empty
+      val split = new Split(taskContext, conf)
+      split.onStart(StartTime(0))
+      mockTaskActor.expectMsgType[Message]
+
+      val expectedWordCount = Split.TEXT_TO_SPLIT.split( 
"""[\s\n]+""").filter(_.nonEmpty).length
+
+      split.onNext(Message("next"))
+      verify(taskContext, times(expectedWordCount)).output(anyObject())
+
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
new file mode 100644
index 0000000..e42d696
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SumSpec extends PropSpec with PropertyChecks with Matchers with 
BeforeAndAfter {
+  val stringGenerator = Gen.alphaStr
+
+  var wordcount = 0
+
+  property("Sum should calculate the frequency of the word correctly") {
+
+    val taskContext = MockUtil.mockTaskContext
+
+    val conf = UserConfig.empty
+
+    val sum = new Sum(taskContext, conf)
+
+    sum.onStart(StartTime(0))
+
+    forAll(stringGenerator) { txt =>
+      wordcount += 1
+      sum.onNext(Message(txt))
+    }
+    val all = sum.map.foldLeft(0L) { (total, kv) =>
+      val (_, num) = kv
+      total + num
+    }
+    assert(sum.wordCount == all && sum.wordCount == wordcount)
+
+    sum.reportWordCount()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
new file mode 100644
index 0000000..f703552
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class WordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("WordCount should succeed to submit application with required 
arguments") {
+    val requiredArgs = Array.empty[String]
+    val optionalArgs = Array(
+      "-split", "1",
+      "-sum", "1")
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
+
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        WordCount.main(masterConfig, args)
+      }
+
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala 
b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
index 8851083..7daebf1 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
+++ b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
@@ -33,8 +33,8 @@ import akka.stream.impl.StreamLayout._
 import akka.stream.impl._
 import akka.stream.{Graph => AkkaGraph}
 
-import _root_.io.gearpump.util
-import _root_.io.gearpump.util.Graph
+import _root_.org.apache.gearpump.util
+import _root_.org.apache.gearpump.util.Graph
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
index b1bf01c..976b1e6 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
@@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.GearSource
 import akka.stream.scaladsl.Sink
 
-import io.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
 
 /**
  * read from remote and write to local

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
index b1e2fcb..7b80b7b 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
@@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.GearSink
 import akka.stream.scaladsl.Source
 
-import io.gearpump.streaming.dsl.LoggerSink
+import org.apache.gearpump.streaming.dsl.LoggerSink
 
 /**
  * read from local and write to remote

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
index 38b40b7..0fccd30 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
@@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer
 import akka.stream.gearpump.scaladsl.GearSource
 import akka.stream.scaladsl.Sink
 
-import io.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
 
 /**
  * WordCount example

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
index c2d0417..56b89bc 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
@@ -31,8 +31,8 @@ import akka.stream.scaladsl._
 import akka.util.ByteString
 import org.json4s.JsonAST.JString
 
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.util.AkkaApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
 
 /**
  * this example is ported from 
http://engineering.intenthq.com/2015/06/wikidata-akka-streams/

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
index ef8c6fa..19083f6 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
@@ -28,7 +28,7 @@ import akka.stream.impl.Stages.DirectProcessor
 import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module}
 import akka.stream.impl.{SinkModule, SourceModule}
 
-import io.gearpump.util.Graph
+import org.apache.gearpump.util.Graph
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
index 473f32a..6ef8598 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
@@ -28,7 +28,7 @@ import akka.stream.impl.{PublisherSource, SubscriberSink}
 import akka.stream.{Outlet, SinkShape, SourceShape}
 import org.reactivestreams.{Publisher, Subscriber}
 
-import io.gearpump.util.Graph
+import org.apache.gearpump.util.Graph
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
index 5bb9a68..3cea78a 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
@@ -26,10 +26,10 @@ import 
akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient
 import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
 
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.embedded.EmbeddedCluster
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.util.Graph
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.util.Graph
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
index 4364645..564b6c7 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
@@ -21,7 +21,7 @@ package akka.stream.gearpump.graph
 import akka.stream.ModuleGraph.Edge
 import akka.stream.impl.StreamLayout.Module
 
-import io.gearpump.util.Graph
+import org.apache.gearpump.util.Graph
 
 /**
  * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
index 78125c8..a5c6e48 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
@@ -29,7 +29,7 @@ import akka.stream.impl.StreamLayout.Module
 import akka.stream.impl.StreamSupervisor
 import akka.stream.{ActorAttributes, ActorMaterializer, 
ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, 
MaterializationContext, ModuleGraph}
 
-import io.gearpump.util.Graph
+import org.apache.gearpump.util.Graph
 
 /**
  * [[LocalMaterializer]] will use local actor to materialize the graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
index 97d8f70..1ec724e 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
@@ -33,7 +33,7 @@ import akka.stream.impl.{ActorProcessorFactory, 
ActorPublisher, ExposedPublisher
 import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, 
InPort, MaterializationContext, Materializer, OutPort, Shape}
 import org.reactivestreams.{Processor, Publisher, Subscriber}
 
-import io.gearpump.util.Graph
+import org.apache.gearpump.util.Graph
 
 /**
  * This materializer is functional equivalent to 
[[akka.stream.impl.ActorMaterializerImpl]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
index 9852ed0..47ed1f2 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
@@ -28,11 +28,11 @@ import akka.stream.impl.Stages.StageModule
 import akka.stream.impl.StreamLayout.Module
 import org.slf4j.LoggerFactory
 
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, 
FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, 
SlaveOp}
-import io.gearpump.streaming.{ProcessorId, StreamApplication}
-import io.gearpump.util.Graph
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, 
FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, 
SlaveOp}
+import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
 
 /**
  * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a 
Gearpump

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
index 4b7d3ac..c4c78cc 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
@@ -22,10 +22,10 @@ import akka.stream.impl.FlowModule
 import akka.stream.impl.StreamLayout.Module
 import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
 
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.task.Task
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
 
 /**
  * [[GearpumpTaskModule]] represent modules that can be materialized as 
Gearpump Tasks.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
index a9f6e97..9e35389 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
@@ -23,10 +23,10 @@ import akka.stream.gearpump.module.{DummySink, DummySource, 
GroupByModule, Proce
 import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
 import org.reactivestreams.{Publisher, Subscriber}
 
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.task.Task
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
 
 object GearSource {
 
@@ -61,7 +61,7 @@ object GearSource {
   }
 
   /**
-   * Construct a Source from Gearpump [[io.gearpump.streaming.Processor]].
+   * Construct a Source from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
    *
    * [[ProcessorModule]] -> downstream Sink
    *
@@ -106,7 +106,7 @@ object GearSink {
   }
 
   /**
-   * Construct a Sink from Gearpump [[io.gearpump.streaming.Processor]].
+   * Construct a Sink from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
    *
    * Upstream Source -> [[ProcessorModule]]
    *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
index 58a04ca..2eb0612 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
@@ -18,9 +18,9 @@
 
 package akka.stream.gearpump.task
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
 
 class BalanceTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
index 388806e..925bf21 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
@@ -18,9 +18,9 @@
 
 package akka.stream.gearpump.task
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
 
 class BroadcastTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
   override def onNext(msg: Message): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
index d3f483d..9a4e24e 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
@@ -20,10 +20,10 @@ package akka.stream.gearpump.task
 
 import akka.stream.gearpump.task.GraphTask.{Index, PortId}
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskWrapper}
 
 class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
   extends Task(inputTaskContext, userConf) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
index d7bacd5..b681852 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
@@ -27,13 +27,13 @@ import 
akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
 import akka.util.Timeout
 import org.reactivestreams.{Publisher, Subscriber, Subscription}
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.util.LogUtil
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
+import org.apache.gearpump.util.LogUtil
 
 /**
  * Bridge Task when data flow is from remote Gearpump Task to local 
Akka-Stream Module

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
index b433a7f..ccbd350 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
@@ -24,12 +24,12 @@ import akka.actor.Actor.Receive
 import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, 
Complete, Error}
 import org.reactivestreams.{Subscriber, Subscription}
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
 
 /**
  * Bridge Task when data flow is from local Akka-Stream Module to remote 
Gearpump Task

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
index 0b3b9a5..78fabbe 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
@@ -20,9 +20,9 @@ package akka.stream.gearpump.task
 
 import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
 
 class UnZip2Task(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
deleted file mode 100644
index 973ad03..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-public class CGroupResource {
-
-  private ResourceType type;
-
-  private int hierarchyID;
-
-  private int cgroupsNum;
-
-  private boolean enable;
-
-  public CGroupResource(ResourceType type, int hierarchyID, int cgroupNum, 
boolean enable) {
-    this.type = type;
-    this.hierarchyID = hierarchyID;
-    this.cgroupsNum = cgroupNum;
-    this.enable = enable;
-  }
-
-  public ResourceType getType() {
-    return type;
-  }
-
-  public void setType(ResourceType type) {
-    this.type = type;
-  }
-
-  public int getHierarchyID() {
-    return hierarchyID;
-  }
-
-  public void setHierarchyID(int hierarchyID) {
-    this.hierarchyID = hierarchyID;
-  }
-
-  public int getCgroupsNum() {
-    return cgroupsNum;
-  }
-
-  public void setCgroupsNum(int cgroupsNum) {
-    this.cgroupsNum = cgroupsNum;
-  }
-
-  public boolean isEnable() {
-    return enable;
-  }
-
-  public void setEnable(boolean enable) {
-    this.enable = enable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
deleted file mode 100644
index dc889ba..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import io.gearpump.cluster.utils.SystemOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.*;
-
-public class CgroupCenter implements CgroupOperation {
-
-  public static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
-
-  private static CgroupCenter instance;
-
-  private CgroupCenter() {
-
-  }
-
-  /**
-   * Thread unsafe
-   *
-   * @return
-   */
-  public synchronized static CgroupCenter getInstance() {
-    if (instance == null)
-      instance = new CgroupCenter();
-    return CgroupUtils.enabled() ? instance : null;
-  }
-
-  @Override
-  public List<Hierarchy> getHierarchies() {
-    // TODO Auto-generated method stub
-    Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
-    FileReader reader = null;
-    BufferedReader br = null;
-    try {
-      reader = new FileReader(Constants.MOUNT_STATUS_FILE);
-      br = new BufferedReader(reader);
-      String str = null;
-      while ((str = br.readLine()) != null) {
-        String[] strSplit = str.split(" ");
-        if (!strSplit[2].equals("cgroup"))
-          continue;
-        String name = strSplit[0];
-        String type = strSplit[3];
-        String dir = strSplit[1];
-        Hierarchy h = hierarchies.get(type);
-        h = new Hierarchy(name, CgroupUtils.analyse(type), dir);
-        hierarchies.put(type, h);
-      }
-      return new ArrayList<Hierarchy>(hierarchies.values());
-    } catch (Exception e) {
-      LOG.error("Get hierarchies error", e);
-    } finally {
-      CgroupUtils.close(reader, br);
-    }
-    return null;
-  }
-
-  @Override
-  public Set<CGroupResource> getCGroupResources() {
-    // TODO Auto-generated method stub
-    Set<CGroupResource> resources = new HashSet<CGroupResource>();
-    FileReader reader = null;
-    BufferedReader br = null;
-    try {
-      reader = new FileReader(Constants.CGROUP_STATUS_FILE);
-      br = new BufferedReader(reader);
-      String str = null;
-      while ((str = br.readLine()) != null) {
-        String[] split = str.split("\t");
-        ResourceType type = ResourceType.getResourceType(split[0]);
-        if (type == null)
-          continue;
-        resources.add(new CGroupResource(type, Integer.valueOf(split[1]), 
Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true
-          : false));
-      }
-      return resources;
-    } catch (Exception e) {
-      LOG.error("Get subSystems error ", e);
-    } finally {
-      CgroupUtils.close(reader, br);
-    }
-    return null;
-  }
-
-  @Override
-  public boolean enabled(ResourceType resourceType) {
-    // TODO Auto-generated method stub
-    Set<CGroupResource> resources = this.getCGroupResources();
-    for (CGroupResource resource : resources) {
-      if (resource.getType() == resourceType)
-        return true;
-    }
-    return false;
-  }
-
-  @Override
-  public Hierarchy busy(ResourceType resourceType) {
-    List<Hierarchy> hierarchies = this.getHierarchies();
-    for (Hierarchy hierarchy : hierarchies) {
-      for (ResourceType type : hierarchy.getResourceTypes()) {
-        if (type == resourceType)
-          return hierarchy;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Hierarchy mounted(Hierarchy hierarchy) {
-    // TODO Auto-generated method stub
-    List<Hierarchy> hierarchies = this.getHierarchies();
-    if (CgroupUtils.dirExists(hierarchy.getDir())) {
-      for (Hierarchy h : hierarchies) {
-        if (h.equals(hierarchy))
-          return h;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void mount(Hierarchy hierarchy) throws IOException {
-    // TODO Auto-generated method stub
-    if (this.mounted(hierarchy) != null) {
-      LOG.error(hierarchy.getDir() + " is mounted");
-      return;
-    }
-    Set<ResourceType> resourceTypes = hierarchy.getResourceTypes();
-    for (ResourceType type : resourceTypes) {
-      if (this.busy(type) != null) {
-        LOG.error("subsystem: " + type.name() + " is busy");
-        resourceTypes.remove(type);
-      }
-    }
-    if (resourceTypes.size() == 0)
-      return;
-    if (!CgroupUtils.dirExists(hierarchy.getDir()))
-      new File(hierarchy.getDir()).mkdirs();
-    String subSystems = CgroupUtils.reAnalyse(resourceTypes);
-    SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", 
subSystems);
-  }
-
-  @Override
-  public void umount(Hierarchy hierarchy) throws IOException {
-    // TODO Auto-generated method stub
-    if (this.mounted(hierarchy) != null) {
-      hierarchy.getRootCgroups().delete();
-      SystemOperation.umount(hierarchy.getDir());
-      CgroupUtils.deleteDir(hierarchy.getDir());
-    }
-  }
-
-  @Override
-  public void create(CgroupCommon cgroup) throws SecurityException {
-    // TODO Auto-generated method stub
-    if (cgroup.isRoot()) {
-      LOG.error("You can't create rootCgroup in this function");
-      return;
-    }
-    CgroupCommon parent = cgroup.getParent();
-    while (parent != null) {
-      if (!CgroupUtils.dirExists(parent.getDir())) {
-        LOG.error(parent.getDir() + "is not existed");
-        return;
-      }
-      parent = parent.getParent();
-    }
-    Hierarchy h = cgroup.getHierarchy();
-    if (mounted(h) == null) {
-      LOG.error(h.getDir() + " is not mounted");
-      return;
-    }
-    if (CgroupUtils.dirExists(cgroup.getDir())) {
-      LOG.error(cgroup.getDir() + " is existed");
-      return;
-    }
-    (new File(cgroup.getDir())).mkdir();
-  }
-
-  @Override
-  public void delete(CgroupCommon cgroup) throws IOException {
-    // TODO Auto-generated method stub
-    cgroup.delete();
-  }
-
-  public static void main(String args[]) {
-    
System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
deleted file mode 100644
index 5414814..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import io.gearpump.cluster.cgroup.core.CgroupCore;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class CgroupCommon implements CgroupCommonOperation {
-
-  public static final String TASKS = "/tasks";
-  public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
-  public static final String RELEASE_AGENT = "/release_agent";
-  public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children";
-  public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
-  public static final String CGROUP_PROCS = "/cgroup.procs";
-
-  private final Hierarchy hierarchy;
-
-  private final String name;
-
-  private final String dir;
-
-  private final CgroupCommon parent;
-
-  private final Map<ResourceType, CgroupCore> cores;
-
-  private final boolean isRoot;
-
-  private final Set<CgroupCommon> children = new HashSet<CgroupCommon>();
-
-  public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) {
-    this.name = parent.getName() + "/" + name;
-    this.hierarchy = hierarchy;
-    this.parent = parent;
-    this.dir = parent.getDir() + "/" + name;
-    this.init();
-    cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), 
this.dir);
-    this.isRoot = false;
-  }
-
-  /**
-   * rootCgroup
-   */
-  public CgroupCommon(Hierarchy hierarchy, String dir) {
-    this.name = "";
-    this.hierarchy = hierarchy;
-    this.parent = null;
-    this.dir = dir;
-    this.init();
-    cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), 
this.dir);
-    this.isRoot = true;
-  }
-
-  @Override
-  public void addTask(int taskId) throws IOException {
-    // TODO Auto-generated method stub
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), 
String.valueOf(taskId));
-  }
-
-  @Override
-  public Set<Integer> getTasks() throws IOException {
-    List<String> stringTasks = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
-    Set<Integer> tasks = new HashSet<Integer>();
-    for (String task : stringTasks) {
-      tasks.add(Integer.valueOf(task));
-    }
-    return tasks;
-  }
-
-  @Override
-  public void addProcs(int pid) throws IOException {
-    // TODO Auto-generated method stub
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), 
String.valueOf(pid));
-  }
-
-  @Override
-  public Set<Integer> getPids() throws IOException {
-    // TODO Auto-generated method stub
-    List<String> stringPids = 
CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
-    Set<Integer> pids = new HashSet<Integer>();
-    for (String task : stringPids) {
-      pids.add(Integer.valueOf(task));
-    }
-    return pids;
-  }
-
-  @Override
-  public void setNotifyOnRelease(boolean flag) throws IOException {
-    // TODO Auto-generated method stub
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), 
flag ? "1" : "0");
-  }
-
-  @Override
-  public boolean getNotifyOnRelease() throws IOException {
-    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
-  }
-
-  @Override
-  public void setReleaseAgent(String command) throws IOException {
-    // TODO Auto-generated method stub
-    if (!this.isRoot)
-      return;
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), 
command);
-  }
-
-  @Override
-  public String getReleaseAgent() throws IOException {
-    if (!this.isRoot)
-      return null;
-    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
RELEASE_AGENT)).get(0);
-  }
-
-  @Override
-  public void setCgroupCloneChildren(boolean flag) throws IOException {
-    // TODO Auto-generated method stub
-    if (!this.cores.keySet().contains(ResourceType.cpuset))
-      return;
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
-  }
-
-  @Override
-  public boolean getCgroupCloneChildren() throws IOException {
-    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, 
CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
-  }
-
-  @Override
-  public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException {
-    // TODO Auto-generated method stub
-    StringBuilder sb = new StringBuilder();
-    sb.append(eventFd);
-    sb.append(' ');
-    sb.append(controlFd);
-    for (String arg : args) {
-      sb.append(' ');
-      sb.append(arg);
-    }
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, 
CGROUP_EVENT_CONTROL), sb.toString());
-  }
-
-  public Hierarchy getHierarchy() {
-    return hierarchy;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public String getDir() {
-    return dir;
-  }
-
-  public CgroupCommon getParent() {
-    return parent;
-  }
-
-  public Set<CgroupCommon> getChildren() {
-    return children;
-  }
-
-  public boolean isRoot() {
-    return isRoot;
-  }
-
-  public Map<ResourceType, CgroupCore> getCores() {
-    return cores;
-  }
-
-  public void delete() throws IOException {
-    this.free();
-    if (!this.isRoot)
-      this.parent.getChildren().remove(this);
-  }
-
-  private void free() throws IOException {
-    for (CgroupCommon child : this.children)
-      child.free();
-    if (this.isRoot)
-      return;
-    Set<Integer> tasks = this.getTasks();
-    if (tasks != null) {
-      for (Integer task : tasks) {
-        this.parent.addTask(task);
-      }
-    }
-    CgroupUtils.deleteDir(this.dir);
-  }
-
-  private void init() {
-    File file = new File(this.dir);
-    File[] files = file.listFiles();
-    if (files == null)
-      return;
-    for (File child : files) {
-      if (child.isDirectory()) {
-        this.children.add(new CgroupCommon(child.getName(), this.hierarchy, 
this));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
deleted file mode 100644
index 7465645..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import java.io.IOException;
-import java.util.Set;
-
-public interface CgroupCommonOperation {
-
-  public void addTask(int taskid) throws IOException;
-
-  public Set<Integer> getTasks() throws IOException;
-
-  public void addProcs(int pid) throws IOException;
-
-  public Set<Integer> getPids() throws IOException;
-
-  public void setNotifyOnRelease(boolean flag) throws IOException;
-
-  public boolean getNotifyOnRelease() throws IOException;
-
-  public void setReleaseAgent(String command) throws IOException;
-
-  public String getReleaseAgent() throws IOException;
-
-  public void setCgroupCloneChildren(boolean flag) throws IOException;
-
-  public boolean getCgroupCloneChildren() throws IOException;
-
-  public void setEventControl(String eventFd, String controlFd, String... 
args) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
deleted file mode 100644
index a719f91..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import io.gearpump.cluster.cgroup.core.CgroupCore;
-import io.gearpump.cluster.cgroup.core.CpuCore;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class CgroupCoreFactory {
-
-  public static Map<ResourceType, CgroupCore> getInstance(Set<ResourceType> 
types, String dir) {
-    Map<ResourceType, CgroupCore> result = new HashMap<ResourceType, 
CgroupCore>();
-    for (ResourceType type : types) {
-      switch (type) {
-        case cpu:
-          result.put(ResourceType.cpu, new CpuCore(dir));
-          break;
-        default:
-          break;
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
deleted file mode 100644
index a3d830a..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-public interface CgroupOperation {
-
-  public List<Hierarchy> getHierarchies();
-
-  public Set<CGroupResource> getCGroupResources();
-
-  public boolean enabled(ResourceType subsystem);
-
-  public Hierarchy busy(ResourceType subsystem);
-
-  public Hierarchy mounted(Hierarchy hierarchy);
-
-  public void mount(Hierarchy hierarchy) throws IOException;
-
-  public void umount(Hierarchy hierarchy) throws IOException;
-
-  public void create(CgroupCommon cgroup) throws SecurityException;
-
-  public void delete(CgroupCommon cgroup) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
deleted file mode 100644
index 0a7f97c..0000000
--- 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import io.gearpump.cluster.utils.SystemOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class CgroupUtils {
-
-  public static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class);
-
-  public static void deleteDir(String dir) {
-    try {
-      String cmd = "rmdir " + dir;
-      SystemOperation.exec(cmd);
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      LOG.error("rm " + dir + " fail!", e);
-    }
-  }
-
-  public static boolean fileExists(String dir) {
-    File file = new File(dir);
-    return file.exists();
-  }
-
-  public static boolean dirExists(String dir) {
-    File file = new File(dir);
-    return file.isDirectory();
-  }
-
-  public static Set<ResourceType> analyse(String str) {
-    Set<ResourceType> result = new HashSet<ResourceType>();
-    String[] subSystems = str.split(",");
-    for (String subSystem : subSystems) {
-      ResourceType type = ResourceType.getResourceType(subSystem);
-      if (type != null)
-        result.add(type);
-    }
-    return result;
-  }
-
-  public static String reAnalyse(Set<ResourceType> subSystems) {
-    StringBuilder sb = new StringBuilder();
-    if (subSystems.size() == 0)
-      return sb.toString();
-    for (ResourceType type : subSystems) {
-      sb.append(type.name()).append(",");
-    }
-    return sb.toString().substring(0, sb.length() - 1);
-  }
-
-  public static boolean enabled() {
-    return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
-  }
-
-  public static List<String> readFileByLine(String fileDir) throws IOException 
{
-    List<String> result = new ArrayList<String>();
-    FileReader fileReader = null;
-    BufferedReader reader = null;
-    try {
-      File file = new File(fileDir);
-      fileReader = new FileReader(file);
-      reader = new BufferedReader(fileReader);
-      String tempString = null;
-      while ((tempString = reader.readLine()) != null) {
-        result.add(tempString);
-      }
-    } finally {
-      CgroupUtils.close(fileReader, reader);
-    }
-    return result;
-  }
-
-  public static void writeFileByLine(String fileDir, List<String> strings) 
throws IOException {
-    FileWriter writer = null;
-    BufferedWriter bw = null;
-    try {
-      File file = new File(fileDir);
-      if (!file.exists()) {
-        LOG.error(fileDir + " is no existed");
-        return;
-      }
-      writer = new FileWriter(file, true);
-      bw = new BufferedWriter(writer);
-      for (String string : strings) {
-        bw.write(string);
-        bw.newLine();
-        bw.flush();
-      }
-    } finally {
-      CgroupUtils.close(writer, bw);
-    }
-  }
-
-  public static void writeFileByLine(String fileDir, String string) throws 
IOException {
-    FileWriter writer = null;
-    BufferedWriter bw = null;
-    try {
-      File file = new File(fileDir);
-      if (!file.exists()) {
-        LOG.error(fileDir + " is no existed");
-        return;
-      }
-      writer = new FileWriter(file, true);
-      bw = new BufferedWriter(writer);
-      bw.write(string);
-      bw.newLine();
-      bw.flush();
-    } finally {
-      CgroupUtils.close(writer, bw);
-    }
-  }
-
-  public static void close(FileReader reader, BufferedReader br) {
-    try {
-      if (reader != null)
-        reader.close();
-      if (br != null)
-        br.close();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-
-    }
-  }
-
-  public static void close(FileWriter writer, BufferedWriter bw) {
-    try {
-      if (writer != null)
-        writer.close();
-      if (bw != null)
-        bw.close();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java 
b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
deleted file mode 100644
index 80e12be..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-public class Constants {
-
-  public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
-
-  public static final String MOUNT_STATUS_FILE = "/proc/mounts";
-
-  public static String getDir(String dir, String constant) {
-    return dir + constant;
-  }
-}


Reply via email to