http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
new file mode 100644
index 0000000..d8bdc1e
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.scalatest.TestData
+
+import org.apache.gearpump.integrationtest.kafka._
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the Kafka datasource connector
+ */
+class ConnectorKafkaSpec extends TestSpecBase {
+
+  private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
+  private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
+  private var producer: NumericalDataProducer = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    kafkaCluster.start()
+  }
+
+  override def afterAll(): Unit = {
+    kafkaCluster.shutDown()
+    super.afterAll()
+  }
+
+  override def afterEach(test: TestData): Unit = {
+    super.afterEach(test)
+    if (producer != null) {
+      producer.stop()
+      producer = null
+    }
+  }
+
+  "KafkaSource and KafkaSink" should {
+    "read from and write to kafka" in {
+      // setup
+      val sourceTopic = "topic1"
+      val sinkTopic = "topic2"
+      val messageNum = 10000
+      kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
+
+      // exercise
+      val args = 
Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+        "-brokerList", kafkaCluster.getBrokerListConnectString,
+        "-sourceTopic", sourceTopic,
+        "-sinkTopic", sinkTopic).mkString(" ")
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(kafkaJar, 
cluster.getWorkerHosts.length, args)
+      success shouldBe true
+
+      // verify
+      expectAppIsRunning(appId, "KafkaReadWrite")
+      Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == 
messageNum,
+        "kafka all message written")
+    }
+  }
+
+  "Gearpump with Kafka" should {
+    "support at-least-once message delivery" in {
+      // setup
+      val sourcePartitionNum = 2
+      val sourceTopic = "topic3"
+      val sinkTopic = "topic4"
+      // Generate number sequence (1, 2, 3, ...) to the topic
+      kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+      producer = new NumericalDataProducer(sourceTopic, 
kafkaCluster.getBrokerListConnectString)
+      producer.start()
+
+      // exercise
+      val args = 
Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+        "-brokerList", kafkaCluster.getBrokerListConnectString,
+        "-sourceTopic", sourceTopic,
+        "-sinkTopic", sinkTopic,
+        "-source", sourcePartitionNum).mkString(" ")
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(kafkaJar, 
cluster.getWorkerHosts.length, args)
+      success shouldBe true
+
+      // verify #1
+      expectAppIsRunning(appId, "KafkaReadWrite")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+
+      // verify #2
+      val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
+      restClient.killExecutor(appId, executorToKill) shouldBe true
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
+        s"executor $executorToKill killed")
+
+      // verify #3
+      val detector = new MessageLossDetector(producer.lastWriteNum)
+      val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+        host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
+      Util.retryUntil(() => {
+        kafkaReader.read()
+        detector.allReceived
+      }, "kafka all message read")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
new file mode 100644
index 0000000..56b33c1
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.metrics.Metrics.Meter
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+class DynamicDagSpec extends TestSpecBase {
+
+  lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
+  val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
+  val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
+  val solName = "sol"
+
+  "dynamic dag" should {
+    "can retrieve a list of built-in partitioner classes" in {
+      val partitioners = restClient.queryBuiltInPartitioners()
+      partitioners.length should be > 0
+      partitioners.foreach(clazz =>
+        clazz should startWith("org.apache.gearpump.partitioner.")
+      )
+    }
+
+    "can compose a wordcount application from scratch" in {
+      // todo: blocked by #1450
+    }
+
+    "can replace down stream with wordcount's sum processor (new processor 
will have metrics)" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor successfully added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
+    }
+
+    "can replace up stream with wordcount's split processor (new processor 
will have metrics)" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 0, splitTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, 
"sendThroughput")
+    }
+
+    "fall back to last dag version when replacing a processor failid" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
+
+      val fakeTaskClass = 
"org.apache.gearpump.streaming.examples.wordcount.Fake"
+      replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
+      Util.retryUntil(() => {
+        val processorsAfterFailure = 
restClient.queryStreamingAppDetail(appId).processors
+        processorsAfterFailure.size == laterProcessors.size
+      }, "new processor added")
+      val currentClock = restClient.queryStreamingAppDetail(appId).clock
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
currentClock,
+        "app clock is advancing")
+    }
+
+    "fall back to last dag version when AppMaster HA triggered" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerAppMaster = restClient.queryApp(appId).appMasterPath
+      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
+
+      restClient.killAppMaster(appId) shouldBe true
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
+        "new AppMaster created")
+      val processors = restClient.queryStreamingAppDetail(appId).processors
+      processors.size shouldEqual laterProcessors.size
+    }
+  }
+
+  private def expectSolJarSubmittedWithAppId(): Int = {
+    val appId = restClient.getNextAvailableAppId()
+    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
+    success shouldBe true
+    expectAppIsRunning(appId, solName)
+    Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+    appId
+  }
+
+  private def replaceProcessor(
+      appId: Int,
+      formerProcessorId: Int,
+      newTaskClass: String,
+      newProcessorDescription: String = "",
+      newParallelism: Int = 1): Unit = {
+    val uploadedJar = restClient.uploadJar(wordCountJar)
+    val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
+      newParallelism, newProcessorDescription,
+      jar = uploadedJar)
+
+    // exercise
+    val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+    success shouldBe true
+  }
+
+  private def processorHasThroughput(appId: Int, processorId: Int, metrics: 
String): Unit = {
+    Util.retryUntil(() => {
+      val actual = restClient.queryStreamingAppMetrics(appId, current = false,
+        path = "processor" + processorId)
+      val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
+      throughput.size should be > 0
+      throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
+    }, "new processor has message received")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
new file mode 100644
index 0000000..27e4665
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+/**
+ * The test spec will perform destructive operations to check the stability
+ */
+class ExampleSpec extends TestSpecBase {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  "distributed shell" should {
+    "execute commands on machines where its executors are running" in {
+      val distShellJar = 
cluster.queryBuiltInExampleJars("distributedshell-").head
+      val mainClass = 
"org.apache.gearpump.examples.distributedshell.DistributedShell"
+      val clientClass = 
"org.apache.gearpump.examples.distributedshell.DistributedShellClient"
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(distShellJar, 
cluster.getWorkerHosts.length, mainClass)
+      success shouldBe true
+      expectAppIsRunning(appId, "DistributedShell")
+      val args = Array(
+        clientClass,
+        "-appid", appId.toString,
+        "-command", "hostname"
+      )
+
+      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
+
+      def verify(): Boolean = {
+        val workerNum = cluster.getWorkerHosts.length
+        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
+          workerNum, args.mkString(" ")).split("\n").
+          filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
+        expectedHostNames.forall(result.contains)
+      }
+
+      Util.retryUntil(() => verify(),
+        s"executors started on all expected hosts 
${expectedHostNames.mkString(", ")}")
+    }
+  }
+
+  "wordcount" should {
+    val wordCountJarNamePrefix = "wordcount-"
+    behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
+
+    "can submit immediately after killing a former one" in {
+      // setup
+      val formerAppId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess =
+        restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(formerAppId, wordCountName)
+      Util.retryUntil(() =>
+        restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app 
running")
+      restClient.killApp(formerAppId)
+
+      // exercise
+      val appId = formerAppId + 1
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+    }
+  }
+
+  "wordcount(java)" should {
+    val wordCountJavaJarNamePrefix = "wordcountjava-"
+    val wordCountJavaName = "wordcountJava"
+    behave like streamingApplication(wordCountJavaJarNamePrefix, 
wordCountJavaName)
+  }
+
+  "sol" should {
+    val solJarNamePrefix = "sol-"
+    val solName = "sol"
+    behave like streamingApplication(solJarNamePrefix, solName)
+  }
+
+  "complexdag" should {
+    val dynamicDagJarNamePrefix = "complexdag-"
+    val dynamicDagName = "dag"
+    behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
+  }
+
+  def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
+    lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
+
+    "can obtain application clock and the clock will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, appName)
+
+      // exercise
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app submitted")
+      val formerClock = restClient.queryStreamingAppDetail(appId).clock
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
formerClock,
+        "app clock is advancing")
+    }
+
+    "can change the parallelism and description of a processor" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess = restClient.submitApp(jar, 
cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(appId, appName)
+      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
+      val processor0 = formerProcessors.get(0).get
+      val expectedProcessorId = formerProcessors.size
+      val expectedParallelism = processor0.parallelism + 1
+      val expectedDescription = processor0.description + "new"
+      val replaceMe = new ProcessorDescription(processor0.id, 
processor0.taskClass,
+        expectedParallelism, description = expectedDescription)
+
+      // exercise
+      val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+      success shouldBe true
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new process added")
+      val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
+      laterProcessor0.parallelism shouldEqual expectedParallelism
+      laterProcessor0.description shouldEqual expectedDescription
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
new file mode 100644
index 0000000..bb9982a
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
+import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, 
SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * Checks message delivery consistency, like at-least-once, and exactly-once.
+ */
+class MessageDeliverySpec extends TestSpecBase {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  "Gearpump" should {
+    "support exactly-once message delivery" in {
+      withKafkaCluster(cluster) { kafkaCluster =>
+        // setup
+        val sourcePartitionNum = 1
+        val sourceTopic = "topic1"
+        val sinkTopic = "topic2"
+
+        // Generate number sequence (1, 2, 3, ...) to the topic
+        kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+        withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) 
{ producer =>
+
+          withHadoopCluster { hadoopCluster =>
+            // exercise
+            val args = 
Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
+              "-defaultFS", hadoopCluster.getDefaultFS,
+              "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+              "-brokerList", kafkaCluster.getBrokerListConnectString,
+              "-sourceTopic", sourceTopic,
+              "-sinkTopic", sinkTopic,
+              "-sourceTask", sourcePartitionNum).mkString(" ")
+            val appId = restClient.getNextAvailableAppId()
+
+            val stateJar = cluster.queryBuiltInExampleJars("state-").head
+            val success = restClient.submitApp(stateJar, executorNum = 1, args 
= args)
+            success shouldBe true
+
+            // verify #1
+            expectAppIsRunning(appId, "MessageCount")
+            Util.retryUntil(() => 
restClient.queryStreamingAppDetail(appId).clock > 0,
+              "app is running")
+
+            // wait for checkpoint to take place
+            Thread.sleep(1000)
+
+            LOG.info("Trigger message replay by kill and restart the 
executors")
+            val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
+            restClient.killExecutor(appId, executorToKill) shouldBe true
+            Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+              .map(_.executorId).max > executorToKill, s"executor 
$executorToKill killed")
+
+            producer.stop()
+            val producedNumbers = producer.producedNumbers
+            LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
+              s", ${producedNumbers.end - 1}] have been written to Kafka")
+
+            // verify #3
+            val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
+
+            assert(producedNumbers.size == kafkaSourceOffset,
+              "produced message should match Kafka queue size")
+
+            LOG.info(s"The Kafka source topic $sourceTopic offset is " + 
kafkaSourceOffset)
+
+            // The sink processor of this job (MessageCountApp) writes total 
message
+            // count to Kafka Sink periodically (once every checkpoint 
interval).
+            // The detector keep record of latest message count.
+            val detector = new ResultVerifier {
+              var latestMessageCount: Int = 0
+              override def onNext(messageCount: Int): Unit = {
+                this.latestMessageCount = messageCount
+              }
+            }
+
+            val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+              host = kafkaCluster.advertisedHost, port = 
kafkaCluster.advertisedPort)
+            Util.retryUntil(() => {
+              kafkaReader.read()
+              LOG.info(s"Received message count: 
${detector.latestMessageCount}, " +
+                s"expect: ${producedNumbers.size}")
+              detector.latestMessageCount == producedNumbers.size
+            }, "MessageCountApp calculated message count matches " +
+              "expected in case of message replay")
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
new file mode 100644
index 0000000..2f5bb64
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.master.MasterStatus
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks REST service usage
+ */
+class RestServiceSpec extends TestSpecBase {
+
+  "query system version" should {
+    "retrieve the current version number" in {
+      restClient.queryVersion() should not be empty
+    }
+  }
+
+  "list applications" should {
+    "retrieve 0 application after cluster just started" in {
+      restClient.listRunningApps().length shouldEqual 0
+    }
+
+    "retrieve 1 application after the first application submission" in {
+      // exercise
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      restClient.listRunningApps().length shouldEqual 1
+    }
+  }
+
+  "submit application (wordcount)" should {
+    "find a running application after submission" in {
+      // exercise
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+    }
+
+    "reject a repeated submission request while the application is running" in 
{
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
+        cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe false
+    }
+
+    "reject an invalid submission (the jar file path is incorrect)" in {
+      // exercise
+      val success = restClient.submitApp(wordCountJar + ".missing", 
cluster.getWorkerHosts.length)
+      success shouldBe false
+    }
+
+    "submit a wordcount application with 4 split and 3 sum processors and 
expect " +
+      "parallelism of processors match the given number" in {
+      // setup
+      val splitNum = 4
+      val sumNum = 3
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
+        s"-split $splitNum -sum $sumNum")
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      val processors = restClient.queryStreamingAppDetail(appId).processors
+      processors.size shouldEqual 2
+      val splitProcessor = processors.get(0).get
+      splitProcessor.parallelism shouldEqual splitNum
+      val sumProcessor = processors.get(1).get
+      sumProcessor.parallelism shouldEqual sumNum
+    }
+
+    "can obtain application metrics and the metrics will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryStreamingAppMetrics(appId, current = 
true).metrics.nonEmpty,
+        "metrics available")
+      val actual = restClient.queryStreamingAppMetrics(appId, current = true)
+      actual.path shouldEqual s"app$appId.processor*"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryStreamingAppMetrics(appId, current 
= true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+
+    "can obtain application corresponding executors' metrics and " +
+      "the metrics will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryExecutorMetrics(appId, current = 
true).metrics.nonEmpty,
+        "metrics available")
+      val actual = restClient.queryExecutorMetrics(appId, current = true)
+      actual.path shouldEqual s"app$appId.executor*"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryExecutorMetrics(appId, current = 
true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+  }
+
+  "kill application" should {
+    "a running application should be killed" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      killAppAndVerify(appId)
+    }
+
+    "should fail when attempting to kill a stopped application" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val submissionSucess = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      submissionSucess shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      killAppAndVerify(appId)
+
+      // exercise
+      val success = restClient.killApp(appId)
+      success shouldBe false
+    }
+
+    "should fail when attempting to kill a non-exist application" in {
+      // setup
+      val freeAppId = restClient.listApps().length + 1
+
+      // exercise
+      val success = restClient.killApp(freeAppId)
+      success shouldBe false
+    }
+  }
+
+  "cluster information" should {
+    "retrieve 1 master for a non-HA cluster" in {
+      // exercise
+      val masterSummary = restClient.queryMaster()
+      masterSummary.cluster.map(_.toTuple) shouldEqual 
cluster.getMastersAddresses
+      masterSummary.aliveFor should be > 0L
+      masterSummary.masterStatus shouldEqual MasterStatus.Synced
+    }
+
+    "retrieve the same number of workers as cluster has" in {
+      // setup
+      val expectedWorkersCount = cluster.getWorkerHosts.size
+
+      // exercise
+      var runningWorkers: Array[WorkerSummary] = Array.empty
+      Util.retryUntil(() => {
+        runningWorkers = restClient.listRunningWorkers()
+        runningWorkers.length == expectedWorkersCount
+      }, "all workers running")
+      runningWorkers.foreach { worker =>
+        worker.state shouldEqual MasterToAppMaster.AppMasterActive
+      }
+    }
+
+    "find a newly added worker instance" in {
+      // setup
+      restartClusterRequired = true
+      val formerWorkersCount = cluster.getWorkerHosts.length
+      Util.retryUntil(() => restClient.listRunningWorkers().length == 
formerWorkersCount,
+        "all workers running")
+      val workerName = "newWorker"
+
+      // exercise
+      cluster.addWorkerNode(workerName)
+      Util.retryUntil(() => restClient.listRunningWorkers().length > 
formerWorkersCount,
+        "new worker added")
+      cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
+      restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
+    }
+
+    "retrieve 0 worker, if cluster is started without any workers" in {
+      // setup
+      restartClusterRequired = true
+      cluster.shutDown()
+
+      // exercise
+      cluster.start(workerNum = 0)
+      cluster.getWorkerHosts.length shouldEqual 0
+      restClient.listRunningWorkers().length shouldEqual 0
+    }
+
+    "can obtain master's metrics and the metrics will keep changing" in {
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryMasterMetrics(current = true).metrics.nonEmpty, 
"metrics available")
+      val actual = restClient.queryMasterMetrics(current = true)
+      actual.path shouldEqual s"master"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryMasterMetrics(current = 
true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+
+    "can obtain workers' metrics and the metrics will keep changing" in {
+      // exercise
+      restClient.listRunningWorkers().foreach { worker =>
+        val workerId = worker.workerId
+        expectMetricsAvailable(
+          restClient.queryWorkerMetrics(workerId, current = 
true).metrics.nonEmpty,
+          "metrics available")
+        val actual = restClient.queryWorkerMetrics(workerId, current = true)
+        actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
+        actual.metrics.foreach(metric => {
+          metric.time should be > 0L
+          metric.value should not be null
+        })
+        val formerMetricsDump = actual.metrics.toString()
+
+        expectMetricsAvailable({
+          val laterMetrics = restClient.queryWorkerMetrics(workerId, current = 
true).metrics
+          laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+        }, "metrics available")
+      }
+    }
+  }
+
+  "configuration" should {
+    "retrieve the configuration of master and match particular values" in {
+      // exercise
+      val actual = restClient.queryMasterConfig()
+      actual.hasPath("gearpump") shouldBe true
+      actual.hasPath("gearpump.cluster") shouldBe true
+      actual.getString("gearpump.hostname") shouldEqual 
cluster.getMasterHosts.mkString(",")
+    }
+
+    "retrieve the configuration of worker X and match particular values" in {
+      // exercise
+      restClient.listRunningWorkers().foreach { worker =>
+        val actual = restClient.queryWorkerConfig(worker.workerId)
+        actual.hasPath("gearpump") shouldBe true
+        actual.hasPath("gearpump.worker") shouldBe true
+      }
+    }
+
+    "retrieve the configuration of executor X and match particular values" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      restClient.queryExecutorBrief(appId).foreach { executor =>
+        val executorId = executor.executorId
+        val actual = restClient.queryExecutorConfig(appId, executorId)
+        actual.hasPath("gearpump") shouldBe true
+        actual.hasPath("gearpump.executor") shouldBe true
+        actual.getInt("gearpump.applicationId") shouldEqual appId
+        actual.getInt("gearpump.executorId") shouldEqual executorId
+      }
+    }
+
+    "retrieve the configuration of application X and match particular values" 
in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      success shouldBe true
+      val actual = restClient.queryAppMasterConfig(appId)
+      actual.hasPath("gearpump") shouldBe true
+      actual.hasPath("gearpump.appmaster") shouldBe true
+    }
+  }
+
+  "application life-cycle" should {
+    "newly started application should be configured same as the previous one, 
after restart" in {
+      // setup
+      val originSplitNum = 4
+      val originSumNum = 3
+      val originAppId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
+        s"-split $originSplitNum -sum $originSumNum")
+      success shouldBe true
+      expectAppIsRunning(originAppId, wordCountName)
+      val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
+
+      // exercise
+      Util.retryUntil(() => restClient.restartApp(originAppId), "app 
restarted")
+      val killedApp = restClient.queryApp(originAppId)
+      killedApp.appId shouldEqual originAppId
+      killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+      val runningApps = restClient.listRunningApps()
+      runningApps.length shouldEqual 1
+      val newAppDetail = 
restClient.queryStreamingAppDetail(runningApps.head.appId)
+      newAppDetail.appName shouldEqual originAppDetail.appName
+      newAppDetail.processors.size shouldEqual originAppDetail.processors.size
+      newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
+      newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
+    }
+  }
+
+  private def killAppAndVerify(appId: Int): Unit = {
+    val success = restClient.killApp(appId)
+    success shouldBe true
+
+    val actualApp = restClient.queryApp(appId)
+    actualApp.appId shouldEqual appId
+    actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+  }
+
+  private def expectMetricsAvailable(condition: => Boolean, 
conditionDescription: String): Unit = {
+    val config = restClient.queryMasterConfig()
+    val reportInterval = 
Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
+    Util.retryUntil(() => condition, conditionDescription, interval = 
reportInterval)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
new file mode 100644
index 0000000..4b15055
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * The test spec will perform destructive operations to check the stability. 
Operations
+ * contains shutting-down appmaster, executor, or worker, and etc..
+ */
+class StabilitySpec extends TestSpecBase {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  "kill appmaster" should {
+    "restart the whole application" in {
+      // setup
+      val appId = commandLineClient.submitApp(wordCountJar)
+      val formerAppMaster = restClient.queryApp(appId).appMasterPath
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+      ensureClockStoredInMaster()
+
+      // exercise
+      restClient.killAppMaster(appId) shouldBe true
+      // todo: how long master will begin to recover and how much time for the 
recovering?
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
+        "appmaster killed and restarted")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  "kill executor" should {
+    "will create a new executor and application will replay from the latest 
application clock" in {
+      // setup
+      val appId = commandLineClient.submitApp(wordCountJar)
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+      val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
+      ensureClockStoredInMaster()
+
+      // exercise
+      restClient.killExecutor(appId, executorToKill) shouldBe true
+      // todo: how long appmaster will begin to recover and how much time for 
the recovering?
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
+        s"executor $executorToKill killed and restarted")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  private def hostName(workerId: WorkerId): String = {
+    val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
+    // Parse hostname from JVM info (in format: PID@hostname)
+    val hostname = worker.get.jvmName.split("@")(1)
+    hostname
+  }
+
+  "kill worker" should {
+    "worker will not recover but all its executors will be migrated to other 
workers" in {
+      // setup
+      restartClusterRequired = true
+      val appId = commandLineClient.submitApp(wordCountJar)
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+
+      val allexecutors = restClient.queryExecutorBrief(appId)
+      val maxExecutor = allexecutors.sortBy(_.executorId).last
+      ensureClockStoredInMaster()
+
+      val appMaster = allexecutors.find(_.executorId == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
+        s"worker: ${maxExecutor.workerId}")
+      val executorsSharingSameWorker = allexecutors
+        .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
+      LOG.info(s"These executors sharing the same worker Id 
${maxExecutor.workerId}," +
+        s" ${executorsSharingSameWorker.mkString(",")}")
+
+      // kill the worker and expect restarting all killed executors on other 
workers.
+      val workerIdToKill = maxExecutor.workerId
+      cluster.removeWorkerNode(hostName(workerIdToKill))
+
+      val appMasterKilled = executorsSharingSameWorker
+        .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+      def executorsMigrated(): Boolean = {
+        val executors = restClient.queryExecutorBrief(appId)
+        val newAppMaster = executors.find(_.executorId == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+        if (appMasterKilled) {
+          newAppMaster.get.workerId != appMaster.get.workerId
+        } else {
+          // New executors will be started to replace killed executors.
+          // The new executors will be assigned larger executor Id. We use 
this trick to detect
+          // Whether new executors have been started successfully.
+          executors.map(_.executorId).max > maxExecutor.executorId
+        }
+      }
+
+      Util.retryUntil(() => {
+        executorsMigrated()
+      }, s"new executor created with id > ${maxExecutor.executorId} when 
worker is killed")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  "kill master" should {
+    "master will be down and all workers will attempt to reconnect and suicide 
after X seconds" in {
+      // setup
+      restartClusterRequired = true
+      val masters = cluster.getMasterHosts
+      val config = restClient.queryMasterConfig()
+      val shutDownTimeout = 
Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
+
+      // exercise
+      masters.foreach(cluster.removeMasterNode)
+      info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers 
are down")
+      Thread.sleep(shutDownTimeout.toMillis)
+
+      // verify
+      val aliveWorkers = cluster.getWorkerHosts
+      Util.retryUntil(() => aliveWorkers.forall(worker => 
!cluster.nodeIsOnline(worker)),
+        "all workers down")
+    }
+  }
+
+  private def ensureClockStoredInMaster(): Unit = {
+    // TODO: 5000ms is a fixed sync period in clock service.
+    // we wait for 5000ms to assume the clock is stored
+    Thread.sleep(5000)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
new file mode 100644
index 0000000..b327bf4
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, 
MessageLossDetector, SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.storm.StormClient
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the compatibility of running Storm applications
+ */
+class StormCompatibilitySpec extends TestSpecBase {
+
+  private lazy val stormClient = {
+    new StormClient(cluster, restClient)
+  }
+
+  val `version0.9` = "09"
+  val `version0.10` = "010"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    stormClient.start()
+  }
+
+  override def afterAll(): Unit = {
+    stormClient.shutDown()
+    super.afterAll()
+  }
+
+  def withStorm(testCode: String => Unit): Unit = {
+    testCode(`version0.9`)
+    testCode(`version0.10`)
+  }
+
+  def getTopologyName(name: String, stormVersion: String): String = {
+    s"${name}_$stormVersion"
+  }
+
+  def getStormJar(stormVersion: String): String = {
+    cluster.queryBuiltInITJars(s"storm$stormVersion-").head
+  }
+
+  "Storm over Gearpump" should withStorm {
+    stormVersion =>
+      s"support basic topologies ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("exclamation", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ExclamationTopology",
+          args = topologyName,
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
+      }
+
+      s"support to run a python version of wordcount ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("wordcount", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.WordCountTopology",
+          args = topologyName,
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
+      }
+
+      s"support DRPC ($stormVersion)" in {
+        // ReachTopology computes the Twitter url reached by users and their 
followers
+        // using Storm Distributed RPC feature
+        // input (user and follower) data are already prepared in memory
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("reach", stormVersion)
+        stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ReachTopology",
+          args = topologyName,
+          appName = topologyName)
+        val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
+
+        // verify
+        Util.retryUntil(() => {
+          drpcClient.execute("reach", "notaurl.com") == "0"
+        }, "drpc reach == 0")
+        drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
+        drpcClient.execute("reach", "engineering.twitter.com/blog/5") 
shouldEqual "14"
+      }
+
+      s"support tick tuple ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.RollingTopWords",
+          args = s"$topologyName remote",
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
+      }
+
+      s"support at-least-once semantics with Storm's Kafka connector 
($stormVersion)" in {
+
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("storm_kafka", stormVersion)
+        val stormKafkaTopology =
+          
s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
+
+        import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+        withKafkaCluster(cluster) {
+          kafkaCluster =>
+            val sourcePartitionNum = 2
+            val sinkPartitionNum = 1
+            val zookeeper = kafkaCluster.getZookeeperConnectString
+            val brokerList = kafkaCluster.getBrokerListConnectString
+            val sourceTopic = "topic1"
+            val sinkTopic = "topic2"
+
+            val args = Array("-topologyName", topologyName, "-sourceTopic", 
sourceTopic,
+              "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, 
"-brokerList", brokerList,
+              "-spoutNum", s"$sourcePartitionNum", "-boltNum", 
s"$sinkPartitionNum"
+            )
+
+            kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+            // generate number sequence (1, 2, 3, ...) to the topic
+            withDataProducer(sourceTopic, brokerList) { producer =>
+
+              val appId = stormClient.submitStormApp(
+                jar = stormJar,
+                mainClass = stormKafkaTopology,
+                args = args.mkString(" "),
+                appName = topologyName)
+
+              Util.retryUntil(() =>
+                restClient.queryStreamingAppDetail(appId).clock > 0, "app 
running")
+
+              // kill executor and verify at-least-once is guaranteed on 
application restart
+              val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
+              restClient.killExecutor(appId, executorToKill) shouldBe true
+              Util.retryUntil(() =>
+                restClient.queryExecutorBrief(appId).map(_.executorId).max > 
executorToKill,
+                s"executor $executorToKill killed")
+
+              // verify no message loss
+              val detector = new
+                  MessageLossDetector(producer.lastWriteNum)
+              val kafkaReader = new
+                  SimpleKafkaReader(detector, sinkTopic, host = 
kafkaCluster.advertisedHost,
+                    port = kafkaCluster.advertisedPort)
+
+              Util.retryUntil(() => {
+                kafkaReader.read()
+                detector.allReceived
+              }, "all kafka message read")
+            }
+        }
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
new file mode 100644
index 0000000..7942308
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.suites
+
+import org.scalatest._
+
+import org.apache.gearpump.integrationtest.MiniClusterProvider
+import org.apache.gearpump.integrationtest.checklist._
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Launch a Gearpump cluster in standalone mode and run all test specs. To 
test a specific
+ * test spec, you need to comment out other lines.
+ */
+class StandaloneModeSuite extends Suites(
+  new CommandLineSpec,
+  new RestServiceSpec,
+  new ExampleSpec,
+  new DynamicDagSpec,
+  new StormCompatibilitySpec,
+  new StabilitySpec,
+  new ConnectorKafkaSpec,
+  new MessageDeliverySpec
+) with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    MiniClusterProvider.managed = true
+    MiniClusterProvider.set(new MiniCluster).start()
+  }
+
+  override def afterAll(): Unit = {
+    MiniClusterProvider.get.shutDown()
+    super.afterAll()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
deleted file mode 100644
index f315ad3..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.log4j.Logger
-
-/**
- * The class is used to execute Docker commands.
- */
-object Docker {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  /**
-   * @throws RuntimeException in case retval != 0
-   */
-  private def doExecute(container: String, command: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC 
$container")
-  }
-
-  private def doExecuteSilently(container: String, command: String): Boolean = 
{
-    ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
-  }
-
-  /**
-   * @throws RuntimeException in case retval != 0
-   */
-  final def execute(container: String, command: String): String = {
-    trace(container, s"Execute $command") {
-      doExecute(container, command)
-    }
-  }
-
-  final def executeSilently(container: String, command: String): Boolean = {
-    trace(container, s"Execute silently $command") {
-      doExecuteSilently(container, command)
-    }
-  }
-
-  final def listContainers(): Seq[String] = {
-    trace("", s"Listing how many containers...") {
-      ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
-        .split("\n").filter(_.nonEmpty)
-    }
-  }
-
-  final def containerIsRunning(name: String): Boolean = {
-    trace(name, s"Check container running or not...") {
-      ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", 
s"FIND $name").nonEmpty
-    }
-  }
-
-  final def getContainerIPAddr(name: String): String = {
-    trace(name, s"Get Ip Address") {
-      Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
-    }
-  }
-
-  final def containerExists(name: String): Boolean = {
-    trace(name, s"Check container existing or not...") {
-      ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", 
s"FIND $name").nonEmpty
-    }
-  }
-
-  /**
-   * @throws RuntimeException in case particular container is created already
-   */
-  final def createAndStartContainer(name: String, image: String, command: 
String,
-      environ: Map[String, String] = Map.empty, // key, value
-      volumes: Map[String, String] = Map.empty, // from, to
-      knownHosts: Set[String] = Set.empty,
-      tunnelPorts: Set[Int] = Set.empty): String = {
-
-    if (containerExists(name)) {
-      killAndRemoveContainer(name)
-    }
-
-    trace(name, s"Create and start $name ($image)...") {
-
-      val optsBuilder = new StringBuilder
-      optsBuilder.append("-d") // run in background
-      optsBuilder.append(" -h " + name) // use container name as hostname
-      optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // 
synchronize timezone settings
-
-      environ.foreach { case (key, value) =>
-        optsBuilder.append(s" -e $key=$value")
-      }
-      volumes.foreach { case (from, to) =>
-        optsBuilder.append(s" -v $from:$to")
-      }
-      knownHosts.foreach(host =>
-        optsBuilder.append(" --link " + host)
-      )
-      tunnelPorts.foreach(port =>
-        optsBuilder.append(s" -p $port:$port")
-      )
-      createAndStartContainer(name, optsBuilder.toString(), command, image)
-    }
-  }
-
-  /**
-   * @throws RuntimeException in case particular container is created already
-   */
-  private def createAndStartContainer(
-      name: String, options: String, command: String, image: String): String = 
{
-    ShellExec.execAndCaptureOutput(s"docker run $options " +
-      s"--name $name $image $command", s"MAKE $name")
-  }
-
-  final def killAndRemoveContainer(name: String): Boolean = {
-    trace(name, s"kill and remove container") {
-      ShellExec.exec(s"docker rm -f $name", s"STOP $name")
-    }
-  }
-
-  final def killAndRemoveContainer(names: Array[String]): Boolean = {
-    assert(names.length > 0)
-    val args = names.mkString(" ")
-    trace(names.mkString(","), s"kill and remove containers") {
-      ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
-    }
-  }
-
-  private def inspect(container: String, option: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker inspect $option $container", 
s"EXEC $container")
-  }
-
-  final def curl(container: String, url: String, options: Array[String] = 
Array.empty[String])
-    : String = {
-    trace(container, s"curl $url") {
-      doExecute(container, s"curl -s ${options.mkString(" ")} $url")
-    }
-  }
-
-  final def getHostName(container: String): String = {
-    trace(container, s"Get hostname of container...") {
-      doExecute(container, "hostname")
-    }
-  }
-
-  final def getNetworkGateway(container: String): String = {
-    trace(container, s"Get gateway of container...") {
-      doExecute(container, "ip route").split("\\s+")(2)
-    }
-  }
-  final def killProcess(container: String, pid: Int, signal: String = 
"SIGKILL"): Boolean = {
-    trace(container, s"Kill process pid: $pid") {
-      doExecuteSilently(container, s"kill -$signal $pid")
-    }
-  }
-
-  final def findJars(container: String, folder: String): Array[String] = {
-    trace(container, s"Find jars under $folder") {
-      doExecute(container, s"find $folder")
-        .split("\n").filter(_.endsWith(".jar"))
-    }
-  }
-
-  private def trace[T](container: String, msg: String)(fun: => T): T = {
-    // scalastyle:off println
-    Console.println() // A empty line to let the output looks better.
-    // scalastyle:on println
-    LOG.debug(s"Container $container====>> $msg")
-    LOG.debug("INPUT==>>")
-    val response = fun
-    LOG.debug("<<==OUTPUT")
-
-    LOG.debug(brief(response))
-
-    LOG.debug(s"<<====Command END. Container $container, $msg \n")
-    response
-  }
-
-  private val PREVIEW_MAX_LENGTH = 1024
-
-  private def brief[T](input: T): String = {
-    val output = input match {
-      case true =>
-        "Success|True"
-      case false =>
-        "Failure|False"
-      case x: Array[Any] =>
-        "Success: [" + x.mkString(",") + "]"
-      case x =>
-        x.toString
-    }
-
-    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
-    }
-    else {
-      output
-    }
-    preview
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
deleted file mode 100644
index 25d7ee3..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.sys.process._
-
-import org.apache.log4j.Logger
-import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
-
-/**
- * The class is used to execute command in a shell
- */
-object ShellExec {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val PROCESS_TIMEOUT = 2.minutes
-
-  /**
-   * The builtin command line parser by ProcessBuilder (implicit sys.process) 
don't
-   * respect the quote chars (' and ")
-   */
-  private def splitQuotedString(str: String): List[String] = {
-    val splitter = new QuotedStringTokenizer(str, " \t\n\r")
-    splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
-  }
-
-  def exec(command: String, sender: String, timeout: Duration = 
PROCESS_TIMEOUT): Boolean = {
-    LOG.debug(s"$sender => `$command`")
-
-    val p = splitQuotedString(command).run()
-    val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = {
-      try {
-        Await.result(f, timeout)
-      } catch {
-        case _: TimeoutException =>
-          LOG.error(s"timeout to execute command `$command`")
-          p.destroy()
-          p.exitValue()
-      }
-    }
-    LOG.debug(s"$sender <= exit $retval")
-    retval == 0
-  }
-
-  def execAndCaptureOutput(command: String, sender: String, timeout: Duration 
= PROCESS_TIMEOUT)
-    : String = {
-    LOG.debug(s"$sender => `$command`")
-
-    val buf = new StringBuilder
-    val processLogger = ProcessLogger((o: String) => 
buf.append(o).append("\n"),
-      (e: String) => buf.append(e).append("\n"))
-    val p = splitQuotedString(command).run(processLogger)
-    val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = {
-      try {
-        Await.result(f, timeout)
-      } catch {
-        case _: TimeoutException =>
-          p.destroy()
-          p.exitValue()
-      }
-    }
-    val output = buf.toString().trim
-    val PREVIEW_MAX_LENGTH = 200
-    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
-    } else {
-      output
-    }
-
-    LOG.debug(s"$sender <= `$preview` exit $retval")
-    if (retval != 0) {
-      throw new RuntimeException(
-        s"exited ($retval) by executing `$command`")
-    }
-    output
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
deleted file mode 100644
index 7e7085d..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.concurrent.duration._
-import scala.util.{Failure, Success, Try}
-
-import org.apache.log4j.Logger
-
-object Util {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  def encodeUriComponent(s: String): String = {
-    try {
-      java.net.URLEncoder.encode(s, "UTF-8")
-        .replaceAll("\\+", "%20")
-        .replaceAll("\\%21", "!")
-        .replaceAll("\\%27", "'")
-        .replaceAll("\\%28", "(")
-        .replaceAll("\\%29", ")")
-        .replaceAll("\\%7E", "~")
-    } catch {
-      case ex: Throwable => s
-    }
-  }
-
-  def retryUntil(
-      condition: () => Boolean, conditionDescription: String, maxTries: Int = 
15,
-      interval: Duration = 10.seconds): Unit = {
-    var met = false
-    var tries = 0
-
-    while (!met && tries < maxTries) {
-
-      met = Try(condition()) match {
-        case Success(true) => true
-        case Success(false) => false
-        case Failure(ex) => false
-      }
-
-      tries += 1
-
-      if (!met) {
-        LOG.error(s"Failed due to (false == $conditionDescription), " +
-          s"retrying for the ${tries} times...")
-        Thread.sleep(interval.toMillis)
-      } else {
-        LOG.info(s"Success ($conditionDescription) after ${tries} retries")
-      }
-    }
-
-    if (!met) {
-      throw new Exception(s"Failed after ${tries} retries, 
($conditionDescription) == false")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
deleted file mode 100644
index f836abd..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.hadoop
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object HadoopCluster {
-
-  /** Starts a Hadoop cluster */
-  def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
-    val hadoopCluster = new HadoopCluster
-    try {
-      hadoopCluster.start()
-      testCode(hadoopCluster)
-    } finally {
-      hadoopCluster.shutDown()
-    }
-  }
-}
-/**
- * This class maintains a single node Hadoop cluster
- */
-class HadoopCluster {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
-  private val HADOOP_HOST = "hadoop0"
-
-  def start(): Unit = {
-    Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
-
-    Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
-    LOG.info("Hadoop cluster is started.")
-  }
-
-  // Checks whether the cluster is alive by listing "/"
-  private def isAlive: Boolean = {
-    Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls 
/")
-  }
-
-  def getDefaultFS: String = {
-    val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
-    s"hdfs://$hostIPAddr:9000"
-  }
-
-  def shutDown(): Unit = {
-    Docker.killAndRemoveContainer(HADOOP_HOST)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
deleted file mode 100644
index 15ba084..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object KafkaCluster {
-
-  /** Starts a Kafka cluster */
-  def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): 
Unit = {
-    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
-    try {
-      kafkaCluster.start()
-      testCode(kafkaCluster)
-    } finally {
-      kafkaCluster.shutDown()
-    }
-  }
-
-  def withDataProducer(topic: String, brokerList: String)
-    (testCode: NumericalDataProducer => Unit): Unit = {
-    val producer = new NumericalDataProducer(topic, brokerList)
-    try {
-      producer.start()
-      testCode(producer)
-    } finally {
-      producer.stop()
-    }
-  }
-}
-
-/**
- * This class maintains a single node Kafka cluster with integrated Zookeeper.
- */
-class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
-  private val KAFKA_HOST = "kafka0"
-  private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
-  private val ZOOKEEPER_PORT = 2181
-  private val BROKER_PORT = 9092
-  val advertisedPort = BROKER_PORT
-
-  def start(): Unit = {
-    Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
-      environ = Map(
-        "ADVERTISED_HOST" -> advertisedHost,
-        "ADVERTISED_PORT" -> BROKER_PORT.toString,
-        "ZK_CHROOT" -> zkChroot),
-      tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
-    )
-    Util.retryUntil(() => isAlive, "kafka cluster is alive")
-    LOG.debug("kafka cluster is started.")
-  }
-
-  def isAlive: Boolean = {
-    !listTopics().contains("Connection refused")
-  }
-
-  def shutDown(): Unit = {
-    Docker.killAndRemoveContainer(KAFKA_HOST)
-  }
-
-  private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
-
-  def listTopics(): String = {
-    kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
-  }
-
-  def getZookeeperConnectString: String = {
-    s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
-  }
-
-  def getBrokerListConnectString: String = {
-    s"$hostIPAddr:$BROKER_PORT"
-  }
-
-  def createTopic(topic: String, partitions: Int = 1): Unit = {
-    LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
-
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-topics.sh" +
-        s" --zookeeper $getZookeeperConnectString" +
-        s" --create --topic $topic --partitions $partitions 
--replication-factor 1")
-  }
-
-  def produceDataToKafka(topic: String, messageNum: Int): Unit = {
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-topics.sh" +
-        s" --zookeeper $getZookeeperConnectString" +
-        s" --create --topic $topic --partitions 1 --replication-factor 1")
-
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
-        s" --broker-list $getBrokerListConnectString" +
-        s" --topic $topic --messages $messageNum")
-  }
-
-  def getLatestOffset(topic: String): Int = {
-    kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, 
getBrokerListConnectString)
-  }
-
-  private def kafkaListTopics(
-      container: String, kafkaHome: String, zookeeperConnectionString: 
String): String = {
-
-    LOG.debug(s"|=> Kafka list topics...")
-    Docker.execute(container,
-      s"$kafkaHome/bin/kafka-topics.sh" +
-        s" --zookeeper $zookeeperConnectionString -list")
-  }
-
-  private def kafkaFetchLatestOffset(
-      container: String, topic: String, kafkaHome: String, brokersList: 
String): Int = {
-    LOG.debug(s"|=> Get latest offset of topic $topic...")
-    val output = Docker.execute(container,
-      s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
-        s" --broker-list $brokersList " +
-        s" --topic $topic --time -1")
-    output.split(":")(2).toInt
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
deleted file mode 100644
index 1cf3125..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import java.util.Properties
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class NumericalDataProducer(topic: String, bootstrapServers: String) {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val producer = createProducer
-  private val WRITE_SLEEP_NANOS = 10
-  private val serializer = new ChillSerializer[Int]
-  var lastWriteNum = 0
-
-  def start(): Unit = {
-    produceThread.start()
-  }
-
-  def stop(): Unit = {
-    if (produceThread.isAlive) {
-      produceThread.interrupt()
-      produceThread.join()
-    }
-    producer.close()
-  }
-
-  /** How many message we have written in total */
-  def producedNumbers: Range = {
-    Range(1, lastWriteNum + 1)
-  }
-
-  private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    val properties = new Properties()
-    properties.setProperty("bootstrap.servers", bootstrapServers)
-    new KafkaProducer[Array[Byte], Array[Byte]](properties,
-      new ByteArraySerializer, new ByteArraySerializer)
-  }
-
-  private val produceThread = new Thread(new Runnable {
-    override def run(): Unit = {
-      try {
-        while (!Thread.currentThread.isInterrupted) {
-          lastWriteNum += 1
-          val msg = serializer.serialize(lastWriteNum)
-          val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
-          producer.send(record)
-          Thread.sleep(0, WRITE_SLEEP_NANOS)
-        }
-      } catch {
-        case ex: InterruptedException =>
-          LOG.error("message producing is stopped by an interrupt")
-      }
-    }
-  })
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
deleted file mode 100644
index 1f773d3..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.collection.mutable
-
-trait ResultVerifier {
-  def onNext(num: Int): Unit
-}
-
-class MessageLossDetector(totalNum: Int) extends ResultVerifier {
-  private val bitSets = new mutable.BitSet(totalNum)
-  var result = List.empty[Int]
-
-  override def onNext(num: Int): Unit = {
-    bitSets.add(num)
-    result :+= num
-  }
-
-  def allReceived: Boolean = {
-    1.to(totalNum).forall(bitSets)
-  }
-
-  def received(num: Int): Boolean = {
-    bitSets(num)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
deleted file mode 100644
index 392ca86..0000000
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.util.{Failure, Success}
-
-import kafka.api.FetchRequestBuilder
-import kafka.consumer.SimpleConsumer
-import kafka.utils.Utils
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: 
Int = 0,
-    host: String, port: Int) {
-
-  private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
-  private val serializer = new ChillSerializer[Int]
-  private var offset = 0L
-
-  def read(): Unit = {
-    val messageSet = consumer.fetch(
-      new FetchRequestBuilder().addFetch(topic, partition, offset, 
Int.MaxValue).build()
-    ).messageSet(topic, partition)
-
-    for (messageAndOffset <- messageSet) {
-      
serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match 
{
-        case Success(msg) =>
-          offset = messageAndOffset.nextOffset
-          verifier.onNext(msg)
-        case Failure(e) => throw e
-      }
-    }
-  }
-}
\ No newline at end of file


Reply via email to