http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
 
b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
new file mode 100644
index 0000000..5c75904
--- /dev/null
+++ 
b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import org.apache.hadoop.conf.Configuration
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+object SequenceFileIO extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<sequence file reader number>", required = 
false,
+      defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<sequence file writer number>", required = false,
+      defaultValue = Some(1)),
+    "input" -> CLIOption[String]("<input file path>", required = true),
+    "output" -> CLIOption[String]("<output file directory>", required = true)
+  )
+
+  def application(config: ParseResult): StreamApplication = {
+    val spoutNum = config.getInt("source")
+    val boltNum = config.getInt("sink")
+    val input = config.getString("input")
+    val output = config.getString("output")
+    val appConfig = 
UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input)
+      .withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
+    val hadoopConfig = appConfig.withHadoopConf(new Configuration())
+    val partitioner = new ShufflePartitioner()
+    val streamProducer = Processor[SeqFileStreamProducer](spoutNum)
+    val streamProcessor = Processor[SeqFileStreamProcessor](boltNum)
+
+    val app = StreamApplication("SequenceFileIO",
+      Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
 
b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
deleted file mode 100644
index e5dbe0b..0000000
--- 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import org.apache.hadoop.conf.Configuration
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.cluster.UserConfig
-
-class HadoopConfigSpec extends WordSpec with Matchers {
-
-  "HadoopConfig" should {
-    "serialize and deserialze hadoop configuration properly" in {
-      val hadoopConf = new Configuration()
-      val key = "test_key"
-      val value = "test_value"
-      hadoopConf.set(key, value)
-
-      val user = UserConfig.empty
-
-      import io.gearpump.streaming.examples.fsio.HadoopConfig._
-      assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
 
b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
deleted file mode 100644
index bb0d26b..0000000
--- 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import java.io.File
-import scala.collection.mutable.ArrayBuffer
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile.Reader
-import org.apache.hadoop.io.{SequenceFile, Text}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.task.{StartTime, TaskId}
-import io.gearpump.streaming.{MockUtil, Processor}
-class SeqFileStreamProcessorSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
-  val kvPairs = new ArrayBuffer[(String, String)]
-  val outputDirectory = "SeqFileStreamProcessor_Test"
-  val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 
0))
-  val hadoopConf = new Configuration()
-  val fs = FileSystem.get(hadoopConf)
-  val textClass = new Text().getClass
-  val _key = new Text()
-  val _value = new Text()
-
-  val kvGenerator = for {
-    key <- Gen.alphaStr
-    value <- Gen.alphaStr
-  } yield (key, value)
-
-  before {
-    implicit val system1 = ActorSystem("SeqFileStreamProcessor", 
TestUtil.DEFAULT_CONFIG)
-    val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
-    val watcher = TestProbe()(system1)
-    val conf = 
HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH,
-      outputDirectory)).withHadoopConf(new Configuration())
-    val context = MockUtil.mockTaskContext
-
-    val processorDescription =
-      Processor.ProcessorToProcessorDescription(id = 0, 
Processor[SeqFileStreamProcessor](1))
-
-    val taskId = TaskId(0, 0)
-    when(context.taskId).thenReturn(taskId)
-
-    val processor = new SeqFileStreamProcessor(context, conf)
-    processor.onStart(StartTime(0))
-
-    forAll(kvGenerator) { kv =>
-      val (key, value) = kv
-      kvPairs.append((key, value))
-      processor.onNext(Message(key + "++" + value))
-    }
-    processor.onStop()
-  }
-
-  property("SeqFileStreamProcessor should write the key-value pairs to a 
sequence file") {
-    val reader = new SequenceFile.Reader(hadoopConf, 
Reader.file(sequenceFilePath))
-    kvPairs.foreach { kv =>
-      val (key, value) = kv
-      if (value.length > 0 && reader.next(_key, _value)) {
-        assert(_key.toString == key && _value.toString == value)
-      }
-    }
-    reader.close()
-  }
-
-  after {
-    fs.deleteOnExit(new Path(outputDirectory))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
 
b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
deleted file mode 100644
index 04dafa7..0000000
--- 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile.Writer
-import org.apache.hadoop.io.{SequenceFile, Text}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.MockUtil._
-import io.gearpump.streaming.task.StartTime
-
-class SeqFileStreamProducerSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
-  val kvPairs = new ArrayBuffer[(String, String)]
-  val inputFile = "SeqFileStreamProducer_Test"
-  val sequenceFilePath = new Path(inputFile)
-  val hadoopConf = new Configuration()
-  val fs = FileSystem.get(hadoopConf)
-  val textClass = new Text().getClass
-  val _key = new Text()
-  val _value = new Text()
-
-  val kvGenerator = for {
-    key <- Gen.alphaStr
-    value <- Gen.alphaStr
-  } yield (key, value)
-
-  before {
-    fs.deleteOnExit(sequenceFilePath)
-    val writer = SequenceFile.createWriter(hadoopConf, 
Writer.file(sequenceFilePath),
-      Writer.keyClass(textClass), Writer.valueClass(textClass))
-    forAll(kvGenerator) { kv =>
-      _key.set(kv._1)
-      _value.set(kv._2)
-      kvPairs.append((kv._1, kv._2))
-      writer.append(_key, _value)
-    }
-    writer.close()
-  }
-
-  property("SeqFileStreamProducer should read the key-value pairs from " +
-    "a sequence file and deliver them") {
-
-    val conf = 
HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH,
-      inputFile)).withHadoopConf(new Configuration())
-
-    val context = MockUtil.mockTaskContext
-
-    val producer = new SeqFileStreamProducer(context, conf)
-    producer.onStart(StartTime(0))
-    producer.onNext(Message("start"))
-
-    val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
-    verify(context).output(argMatch[Message](msg =>
-      expected.contains(msg.msg.asInstanceOf[String])))
-  }
-
-  after {
-    fs.deleteOnExit(sequenceFilePath)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
 
b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
deleted file mode 100644
index efb5e44..0000000
--- 
a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.fsio
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class SequenceFileIOSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll 
with MasterHarness {
-
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  property("SequenceFileIO should succeed to submit application with required 
arguments") {
-    val requiredArgs = Array(
-      "-input", "/tmp/input",
-      "-output", "/tmp/output"
-    )
-    val optionalArgs = Array(
-      "-source", "1",
-      "-sink", "1"
-    )
-    val validArgs = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: 
Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        SequenceFileIO.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-
-    val invalidArgs = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs.take(0), optionalArgs),
-        (requiredArgs.take(2), optionalArgs)
-      )
-    }
-    forAll(invalidArgs) { (requiredArgs: Array[String], optionalArgs: 
Array[String]) =>
-      val args = optionalArgs
-      assert(Try(SequenceFileIO.main(args)).isFailure, "missing required 
arguments, print usage")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
new file mode 100644
index 0000000..cf1bea8
--- /dev/null
+++ 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+
+class HadoopConfigSpec extends WordSpec with Matchers {
+
+  "HadoopConfig" should {
+    "serialize and deserialze hadoop configuration properly" in {
+      val hadoopConf = new Configuration()
+      val key = "test_key"
+      val value = "test_value"
+      hadoopConf.set(key, value)
+
+      val user = UserConfig.empty
+
+      import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+      assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
new file mode 100644
index 0000000..7831b14
--- /dev/null
+++ 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import java.io.File
+import scala.collection.mutable.ArrayBuffer
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile.Reader
+import org.apache.hadoop.io.{SequenceFile, Text}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.task.{StartTime, TaskId}
+import org.apache.gearpump.streaming.{MockUtil, Processor}
+class SeqFileStreamProcessorSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+  val kvPairs = new ArrayBuffer[(String, String)]
+  val outputDirectory = "SeqFileStreamProcessor_Test"
+  val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 
0))
+  val hadoopConf = new Configuration()
+  val fs = FileSystem.get(hadoopConf)
+  val textClass = new Text().getClass
+  val _key = new Text()
+  val _value = new Text()
+
+  val kvGenerator = for {
+    key <- Gen.alphaStr
+    value <- Gen.alphaStr
+  } yield (key, value)
+
+  before {
+    implicit val system1 = ActorSystem("SeqFileStreamProcessor", 
TestUtil.DEFAULT_CONFIG)
+    val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
+    val watcher = TestProbe()(system1)
+    val conf = 
HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH,
+      outputDirectory)).withHadoopConf(new Configuration())
+    val context = MockUtil.mockTaskContext
+
+    val processorDescription =
+      Processor.ProcessorToProcessorDescription(id = 0, 
Processor[SeqFileStreamProcessor](1))
+
+    val taskId = TaskId(0, 0)
+    when(context.taskId).thenReturn(taskId)
+
+    val processor = new SeqFileStreamProcessor(context, conf)
+    processor.onStart(StartTime(0))
+
+    forAll(kvGenerator) { kv =>
+      val (key, value) = kv
+      kvPairs.append((key, value))
+      processor.onNext(Message(key + "++" + value))
+    }
+    processor.onStop()
+  }
+
+  property("SeqFileStreamProcessor should write the key-value pairs to a 
sequence file") {
+    val reader = new SequenceFile.Reader(hadoopConf, 
Reader.file(sequenceFilePath))
+    kvPairs.foreach { kv =>
+      val (key, value) = kv
+      if (value.length > 0 && reader.next(_key, _value)) {
+        assert(_key.toString == key && _value.toString == value)
+      }
+    }
+    reader.close()
+  }
+
+  after {
+    fs.deleteOnExit(new Path(outputDirectory))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
new file mode 100644
index 0000000..ad27e63
--- /dev/null
+++ 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile.Writer
+import org.apache.hadoop.io.{SequenceFile, Text}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.MockUtil._
+import org.apache.gearpump.streaming.task.StartTime
+
+class SeqFileStreamProducerSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+  val kvPairs = new ArrayBuffer[(String, String)]
+  val inputFile = "SeqFileStreamProducer_Test"
+  val sequenceFilePath = new Path(inputFile)
+  val hadoopConf = new Configuration()
+  val fs = FileSystem.get(hadoopConf)
+  val textClass = new Text().getClass
+  val _key = new Text()
+  val _value = new Text()
+
+  val kvGenerator = for {
+    key <- Gen.alphaStr
+    value <- Gen.alphaStr
+  } yield (key, value)
+
+  before {
+    fs.deleteOnExit(sequenceFilePath)
+    val writer = SequenceFile.createWriter(hadoopConf, 
Writer.file(sequenceFilePath),
+      Writer.keyClass(textClass), Writer.valueClass(textClass))
+    forAll(kvGenerator) { kv =>
+      _key.set(kv._1)
+      _value.set(kv._2)
+      kvPairs.append((kv._1, kv._2))
+      writer.append(_key, _value)
+    }
+    writer.close()
+  }
+
+  property("SeqFileStreamProducer should read the key-value pairs from " +
+    "a sequence file and deliver them") {
+
+    val conf = 
HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH,
+      inputFile)).withHadoopConf(new Configuration())
+
+    val context = MockUtil.mockTaskContext
+
+    val producer = new SeqFileStreamProducer(context, conf)
+    producer.onStart(StartTime(0))
+    producer.onNext(Message("start"))
+
+    val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
+    verify(context).output(argMatch[Message](msg =>
+      expected.contains(msg.msg.asInstanceOf[String])))
+  }
+
+  after {
+    fs.deleteOnExit(sequenceFilePath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
new file mode 100644
index 0000000..ba65802
--- /dev/null
+++ 
b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.fsio
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class SequenceFileIOSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll 
with MasterHarness {
+
+  override def beforeAll {
+    startActorSystem()
+  }
+
+  override def afterAll {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  property("SequenceFileIO should succeed to submit application with required 
arguments") {
+    val requiredArgs = Array(
+      "-input", "/tmp/input",
+      "-output", "/tmp/output"
+    )
+    val optionalArgs = Array(
+      "-source", "1",
+      "-sink", "1"
+    )
+    val validArgs = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: 
Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        SequenceFileIO.main(masterConfig, args)
+      }
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+
+    val invalidArgs = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs.take(0), optionalArgs),
+        (requiredArgs.take(2), optionalArgs)
+      )
+    }
+    forAll(invalidArgs) { (requiredArgs: Array[String], optionalArgs: 
Array[String]) =>
+      val args = optionalArgs
+      assert(Try(SequenceFileIO.main(args)).isFailure, "missing required 
arguments, print usage")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/README.md 
b/examples/streaming/kafka/README.md
index bac7101..587271c 100644
--- a/examples/streaming/kafka/README.md
+++ b/examples/streaming/kafka/README.md
@@ -81,7 +81,7 @@ Change directory into gearpump root, build gearpump with `sbt 
pack` and launch a
 Finally, let's run the KafkaWordCount example.
 
    ```bash
-   ./target/pack/bin/gear app -jar 
./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar 
io.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount
+   ./target/pack/bin/gear app -jar 
./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar 
org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount
    ```
 
 One more step is to verify that we've succeeded in producing data to Kafka.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
deleted file mode 100644
index 35b6594..0000000
--- 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.kafka
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.ShufflePartitioner
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-object KafkaReadWrite extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = 
false,
-      defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = 
false,
-      defaultValue = Some(1)),
-    "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", 
required = false,
-      defaultValue = Some("localhost:2181")),
-    "brokerList" -> CLIOption[String]("<broker server list string>", required 
= false,
-      defaultValue = Some("localhost:9092")),
-    "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = 
false,
-      defaultValue = Some("topic1")),
-    "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false,
-      defaultValue = Some("topic2"))
-  )
-
-  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
-    implicit val actorSystem = system
-    val sourceNum = config.getInt("source")
-    val sinkNum = config.getInt("sink")
-    val zookeeperConnect = config.getString("zookeeperConnect")
-    val brokerList = config.getString("brokerList")
-    val sourceTopic = config.getString("sourceTopic")
-    val sinkTopic = config.getString("sinkTopic")
-
-    val appConfig = UserConfig.empty
-    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, 
brokerList)
-    val source = new KafkaSource(sourceTopic, zookeeperConnect, 
offsetStorageFactory)
-    val sourceProcessor = DataSourceProcessor(source, sourceNum)
-    val sink = new KafkaSink(sinkTopic, brokerList)
-    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
-    val partitioner = new ShufflePartitioner
-    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
-    val app = StreamApplication("KafkaReadWrite", Graph(computation), 
appConfig)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(application(config, context.system))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
deleted file mode 100644
index 6955bcc..0000000
--- 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import akka.actor.ActorSystem
-import kafka.api.OffsetRequest
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.kafka.lib.KafkaSourceConfig
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-object KafkaWordCount extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many kafka source tasks>", required = 
false,
-      defaultValue = Some(1)),
-    "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
-    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
-      defaultValue = Some(1))
-  )
-
-  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
-    implicit val actorSystem = system
-    val sourceNum = config.getInt("source")
-    val splitNum = config.getInt("split")
-    val sumNum = config.getInt("sum")
-    val sinkNum = config.getInt("sink")
-
-    val appConfig = UserConfig.empty
-    val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", 
"localhost:9092")
-    val kafkaSourceConfig = new KafkaSourceConfig()
-      
.withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime)
-    val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory)
-    val sourceProcessor = DataSourceProcessor(source, sourceNum)
-    val split = Processor[Split](splitNum)
-    val sum = Processor[Sum](sumNum)
-    val sink = new KafkaSink("topic2", "localhost:9092")
-    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
-    val partitioner = new HashPartitioner
-    val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~>
-      sum ~ partitioner ~> sinkProcessor
-    val app = StreamApplication("KafkaWordCount", Graph(computation), 
appConfig)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(application(config, context.system))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
deleted file mode 100644
index b46d170..0000000
--- 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import com.twitter.bijection.Injection
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Split(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import taskContext.output
-
-  override def onStart(startTime: StartTime): Unit = {
-  }
-
-  override def onNext(msg: Message): Unit = {
-    Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
-      .foreach(_.split("\\s+").foreach(
-        word => output(new Message(word, msg.timestamp))))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
deleted file mode 100644
index 9c67733..0000000
--- 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import com.twitter.bijection.Injection
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Sum(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import taskContext.output
-
-  private[wordcount] var wordcount = Map.empty[String, Long]
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(message: Message): Unit = {
-    val word = message.msg.asInstanceOf[String]
-    val count = wordcount.getOrElse(word, 0L) + 1
-    wordcount += word -> count
-    output(new Message(
-      Injection[String, Array[Byte]](word) ->
-        Injection[Long, Array[Byte]](count),
-      message.timestamp))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
new file mode 100644
index 0000000..364544b
--- /dev/null
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka
+
+import akka.actor.ActorSystem
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+object KafkaReadWrite extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = 
false,
+      defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = 
false,
+      defaultValue = Some(1)),
+    "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", 
required = false,
+      defaultValue = Some("localhost:2181")),
+    "brokerList" -> CLIOption[String]("<broker server list string>", required 
= false,
+      defaultValue = Some("localhost:9092")),
+    "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = 
false,
+      defaultValue = Some("topic1")),
+    "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false,
+      defaultValue = Some("topic2"))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
+    implicit val actorSystem = system
+    val sourceNum = config.getInt("source")
+    val sinkNum = config.getInt("sink")
+    val zookeeperConnect = config.getString("zookeeperConnect")
+    val brokerList = config.getString("brokerList")
+    val sourceTopic = config.getString("sourceTopic")
+    val sinkTopic = config.getString("sinkTopic")
+
+    val appConfig = UserConfig.empty
+    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, 
brokerList)
+    val source = new KafkaSource(sourceTopic, zookeeperConnect, 
offsetStorageFactory)
+    val sourceProcessor = DataSourceProcessor(source, sourceNum)
+    val sink = new KafkaSink(sinkTopic, brokerList)
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new ShufflePartitioner
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+    val app = StreamApplication("KafkaReadWrite", Graph(computation), 
appConfig)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
new file mode 100644
index 0000000..5ef1e67
--- /dev/null
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import akka.actor.ActorSystem
+import kafka.api.OffsetRequest
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig
+import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+object KafkaWordCount extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<how many kafka source tasks>", required = 
false,
+      defaultValue = Some(1)),
+    "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
+    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
+      defaultValue = Some(1))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
+    implicit val actorSystem = system
+    val sourceNum = config.getInt("source")
+    val splitNum = config.getInt("split")
+    val sumNum = config.getInt("sum")
+    val sinkNum = config.getInt("sink")
+
+    val appConfig = UserConfig.empty
+    val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", 
"localhost:9092")
+    val kafkaSourceConfig = new KafkaSourceConfig()
+      
.withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime)
+    val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory)
+    val sourceProcessor = DataSourceProcessor(source, sourceNum)
+    val split = Processor[Split](splitNum)
+    val sum = Processor[Sum](sumNum)
+    val sink = new KafkaSink("topic2", "localhost:9092")
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new HashPartitioner
+    val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~>
+      sum ~ partitioner ~> sinkProcessor
+    val app = StreamApplication("KafkaWordCount", Graph(computation), 
appConfig)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
new file mode 100644
index 0000000..a95f596
--- /dev/null
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import com.twitter.bijection.Injection
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Split(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import taskContext.output
+
+  override def onStart(startTime: StartTime): Unit = {
+  }
+
+  override def onNext(msg: Message): Unit = {
+    Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
+      .foreach(_.split("\\s+").foreach(
+        word => output(new Message(word, msg.timestamp))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
new file mode 100644
index 0000000..9930b92
--- /dev/null
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import com.twitter.bijection.Injection
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Sum(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+  import taskContext.output
+
+  private[wordcount] var wordcount = Map.empty[String, Long]
+
+  override def onStart(startTime: StartTime): Unit = {}
+
+  override def onNext(message: Message): Unit = {
+    val word = message.msg.asInstanceOf[String]
+    val count = wordcount.getOrElse(word, 0L) + 1
+    wordcount += word -> count
+    output(new Message(
+      Injection[String, Array[Byte]](word) ->
+        Injection[Long, Array[Byte]](count),
+      message.timestamp))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
 
b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
deleted file mode 100644
index 35f7a62..0000000
--- 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class KafkaWordCountSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  property("KafkaWordCount should succeed to submit application with required 
arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-source", "1",
-      "-split", "1",
-      "-sum", "1",
-      "-sink", "1")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        KafkaWordCount.main(masterConfig, args)
-      }
-
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
 
b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
deleted file mode 100644
index 2cc6a16..0000000
--- 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import com.twitter.bijection.Injection
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
-
-class SplitSpec extends FlatSpec with Matchers with MockitoSugar {
-
-  it should "split should split the text and deliver to next task" in {
-    val taskContext = mock[TaskContext]
-    val split = new Split(taskContext, UserConfig.empty)
-
-    val msg = "this is a test message"
-    split.onNext(Message(Injection[String, Array[Byte]](msg)))
-    verify(taskContext, times(msg.split(" 
").length)).output(anyObject[Message])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
 
b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
deleted file mode 100644
index 4dcb9d7..0000000
--- 
a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.kafka.wordcount
-
-import scala.collection.mutable
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SumSpec extends FlatSpec with Matchers {
-
-  it should "sum should calculate the frequency of the word correctly" in {
-
-    val stringGenerator = Gen.alphaStr
-    val expectedWordCountMap: mutable.HashMap[String, Long] = new 
mutable.HashMap[String, Long]()
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val sum = new Sum(taskContext, UserConfig.empty)
-    sum.onStart(StartTime(0))
-    val str = "once two two three three three"
-
-    var totalWordCount = 0
-    stringGenerator.map { word =>
-      totalWordCount += 1
-      expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) 
+ 1)
-      sum.onNext(Message(word))
-    }
-    verify(taskContext, times(totalWordCount)).output(anyObject[Message])
-
-    expectedWordCountMap.foreach { wordCount =>
-      val (word, count) = wordCount
-      assert(count == sum.wordcount.get(word).get)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
new file mode 100644
index 0000000..c4c422d
--- /dev/null
+++ 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class KafkaWordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with 
MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  property("KafkaWordCount should succeed to submit application with required 
arguments") {
+    val requiredArgs = Array.empty[String]
+    val optionalArgs = Array(
+      "-source", "1",
+      "-split", "1",
+      "-sum", "1",
+      "-sink", "1")
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        KafkaWordCount.main(masterConfig, args)
+      }
+
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
new file mode 100644
index 0000000..893d06d
--- /dev/null
+++ 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import com.twitter.bijection.Injection
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class SplitSpec extends FlatSpec with Matchers with MockitoSugar {
+
+  it should "split should split the text and deliver to next task" in {
+    val taskContext = mock[TaskContext]
+    val split = new Split(taskContext, UserConfig.empty)
+
+    val msg = "this is a test message"
+    split.onNext(Message(Injection[String, Array[Byte]](msg)))
+    verify(taskContext, times(msg.split(" 
").length)).output(anyObject[Message])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
new file mode 100644
index 0000000..3538ece
--- /dev/null
+++ 
b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.kafka.wordcount
+
+import scala.collection.mutable
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SumSpec extends FlatSpec with Matchers {
+
+  it should "sum should calculate the frequency of the word correctly" in {
+
+    val stringGenerator = Gen.alphaStr
+    val expectedWordCountMap: mutable.HashMap[String, Long] = new 
mutable.HashMap[String, Long]()
+
+    val taskContext = MockUtil.mockTaskContext
+
+    val sum = new Sum(taskContext, UserConfig.empty)
+    sum.onStart(StartTime(0))
+    val str = "once two two three three three"
+
+    var totalWordCount = 0
+    stringGenerator.map { word =>
+      totalWordCount += 1
+      expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) 
+ 1)
+      sum.onNext(Message(word))
+    }
+    verify(taskContext, times(totalWordCount)).output(anyObject[Message])
+
+    expectedWordCountMap.foreach { wordCount =>
+      val (word, count) = wordCount
+      assert(count == sum.wordcount.get(word).get)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/README.md b/examples/streaming/sol/README.md
index a8b10b3..d9c8eab 100644
--- a/examples/streaming/sol/README.md
+++ b/examples/streaming/sol/README.md
@@ -14,5 +14,5 @@ The original code comes from: 
https://github.com/yahoo/storm-perf-test
 
 <B>Example:</B>
 ```
-bin/gear app -jar examples/gearpump-examples-assembly-$VERSION.jar 
io.gearpump.streaming.examples.sol.SOL
+bin/gear app -jar examples/gearpump-examples-assembly-$VERSION.jar 
org.apache.gearpump.streaming.examples.sol.SOL
 ```

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
deleted file mode 100644
index 10c190c..0000000
--- 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.sol
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.ShufflePartitioner
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-object SOL extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "streamProducer" -> CLIOption[Int]("<stream producer number>", required = 
false,
-    defaultValue = Some(1)),
-    "streamProcessor" -> CLIOption[Int]("<stream processor number>", required 
= false,
-    defaultValue = Some(1)),
-    "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = 
false,
-    defaultValue = Some(100)),
-    "stages" -> CLIOption[Int]("<how many stages to run>", required = false,
-    defaultValue = Some(2)))
-
-  def application(config: ParseResult): StreamApplication = {
-    val spoutNum = config.getInt("streamProducer")
-    val boltNum = config.getInt("streamProcessor")
-    val bytesPerMessage = config.getInt("bytesPerMessage")
-    val stages = config.getInt("stages")
-    val appConfig = 
UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, bytesPerMessage)
-    val partitioner = new ShufflePartitioner()
-    val streamProducer = Processor[SOLStreamProducer](spoutNum)
-    val streamProcessor = Processor[SOLStreamProcessor](boltNum)
-    var computation = streamProducer ~ partitioner ~> streamProcessor
-    computation = 0.until(stages - 2).foldLeft(computation) { (c, id) =>
-      c ~ partitioner ~> streamProcessor.copy()
-    }
-    val dag = Graph(computation)
-    val app = StreamApplication("sol", dag, appConfig)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(application(config))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
deleted file mode 100644
index de1054f..0000000
--- 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.sol
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Cancellable
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-  import taskContext.output
-
-  val taskConf = taskContext
-
-  private var msgCount: Long = 0
-  private var scheduler: Cancellable = null
-  private var snapShotWordCount: Long = 0
-  private var snapShotTime: Long = 0
-
-  override def onStart(startTime: StartTime): Unit = {
-    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
-      new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
-    snapShotTime = System.currentTimeMillis()
-  }
-
-  override def onNext(msg: Message): Unit = {
-    output(msg)
-    msgCount = msgCount + 1
-  }
-
-  override def onStop(): Unit = {
-    if (scheduler != null) {
-      scheduler.cancel()
-    }
-  }
-
-  def reportWordCount(): Unit = {
-    val current: Long = System.currentTimeMillis()
-    LOG.info(s"Task ${taskConf.taskId} " +
-      s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) 
/ 1000)} " +
-      s"(words, second)")
-    snapShotWordCount = msgCount
-    snapShotTime = current
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
 
b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
deleted file mode 100644
index 5c0f3be..0000000
--- 
a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.sol
-
-import java.util.Random
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.sol.SOLStreamProducer._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-
-  import taskContext.output
-
-  private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE)
-    .getOrElse(DEFAULT_MESSAGE_SIZE)
-  private var messages: Array[String] = null
-  private var rand: Random = null
-  private var messageCount: Long = 0
-
-  override def onStart(startTime: StartTime): Unit = {
-    prepareRandomMessage
-    self ! Start
-  }
-
-  private def prepareRandomMessage = {
-    rand = new Random()
-    val differentMessages = 100
-    messages = new Array(differentMessages)
-
-    0.until(differentMessages).map { index =>
-      val sb = new StringBuilder(sizeInBytes)
-      // Even though java encodes strings in UCS2, the serialized version sent 
by the tuples
-      // is UTF8, so it should be a single byte
-      0.until(sizeInBytes).foldLeft(sb) { (sb, j) =>
-        sb.append(rand.nextInt(9))
-      }
-      messages(index) = sb.toString()
-    }
-  }
-
-  override def onNext(msg: Message): Unit = {
-    val message = messages(rand.nextInt(messages.length))
-    output(new Message(message, System.currentTimeMillis()))
-    messageCount = messageCount + 1L
-    self ! messageSourceMinClock
-  }
-
-  // messageSourceMinClock represent the min clock of the message source
-  private def messageSourceMinClock: Message = {
-    Message("tick", System.currentTimeMillis())
-  }
-}
-
-object SOLStreamProducer {
-  val DEFAULT_MESSAGE_SIZE = 100
-  // Bytes
-  val BYTES_PER_MESSAGE = "bytesPerMessage"
-  val Start = Message("start")
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
 
b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
new file mode 100644
index 0000000..fb80ad3
--- /dev/null
+++ 
b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+object SOL extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "streamProducer" -> CLIOption[Int]("<stream producer number>", required = 
false,
+    defaultValue = Some(1)),
+    "streamProcessor" -> CLIOption[Int]("<stream processor number>", required 
= false,
+    defaultValue = Some(1)),
+    "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = 
false,
+    defaultValue = Some(100)),
+    "stages" -> CLIOption[Int]("<how many stages to run>", required = false,
+    defaultValue = Some(2)))
+
+  def application(config: ParseResult): StreamApplication = {
+    val spoutNum = config.getInt("streamProducer")
+    val boltNum = config.getInt("streamProcessor")
+    val bytesPerMessage = config.getInt("bytesPerMessage")
+    val stages = config.getInt("stages")
+    val appConfig = 
UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, bytesPerMessage)
+    val partitioner = new ShufflePartitioner()
+    val streamProducer = Processor[SOLStreamProducer](spoutNum)
+    val streamProcessor = Processor[SOLStreamProcessor](boltNum)
+    var computation = streamProducer ~ partitioner ~> streamProcessor
+    computation = 0.until(stages - 2).foldLeft(computation) { (c, id) =>
+      c ~ partitioner ~> streamProcessor.copy()
+    }
+    val dag = Graph(computation)
+    val app = StreamApplication("sol", dag, appConfig)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config))
+    context.close()
+  }
+}


Reply via email to