fix GEARPUMP-150 correct the integration test file structure

Author: huafengw <[email protected]>

Closes #26 from huafengw/name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e9ea6e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e9ea6e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e9ea6e2f

Branch: refs/heads/master
Commit: e9ea6e2f3366ede2aa350204e8b3854a6d0081ca
Parents: c80c069
Author: huafengw <[email protected]>
Authored: Fri May 27 19:38:40 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Fri May 27 19:38:40 2016 +0800

----------------------------------------------------------------------
 .../integrationtest/MiniClusterProvider.scala   |  41 ---
 .../gearpump/integrationtest/TestSpecBase.scala |  92 -----
 .../checklist/CommandLineSpec.scala             | 133 -------
 .../checklist/ConnectorKafkaSpec.scala          | 120 ------
 .../checklist/DynamicDagSpec.scala              | 157 --------
 .../integrationtest/checklist/ExampleSpec.scala | 148 --------
 .../checklist/MessageDeliverySpec.scala         | 122 ------
 .../checklist/RestServiceSpec.scala             | 369 -------------------
 .../checklist/StabilitySpec.scala               | 162 --------
 .../checklist/StormCompatibilitySpec.scala      | 185 ----------
 .../suites/StandaloneModeSuite.scala            |  51 ---
 .../integrationtest/MiniClusterProvider.scala   |  41 +++
 .../gearpump/integrationtest/TestSpecBase.scala |  92 +++++
 .../checklist/CommandLineSpec.scala             | 133 +++++++
 .../checklist/ConnectorKafkaSpec.scala          | 120 ++++++
 .../checklist/DynamicDagSpec.scala              | 157 ++++++++
 .../integrationtest/checklist/ExampleSpec.scala | 148 ++++++++
 .../checklist/MessageDeliverySpec.scala         | 122 ++++++
 .../checklist/RestServiceSpec.scala             | 369 +++++++++++++++++++
 .../checklist/StabilitySpec.scala               | 162 ++++++++
 .../checklist/StormCompatibilitySpec.scala      | 185 ++++++++++
 .../suites/StandaloneModeSuite.scala            |  51 +++
 .../io/gearpump/integrationtest/Docker.scala    | 211 -----------
 .../io/gearpump/integrationtest/ShellExec.scala |  98 -----
 .../io/gearpump/integrationtest/Util.scala      |  72 ----
 .../integrationtest/hadoop/HadoopCluster.scala  |  67 ----
 .../integrationtest/kafka/KafkaCluster.scala    | 140 -------
 .../kafka/NumericalDataProducer.scala           |  76 ----
 .../integrationtest/kafka/ResultVerifier.scala  |  42 ---
 .../kafka/SimpleKafkaReader.scala               |  49 ---
 .../minicluster/BaseContainer.scala             |  60 ---
 .../minicluster/CommandLineClient.scala         |  84 -----
 .../minicluster/MiniCluster.scala               | 189 ----------
 .../minicluster/RestClient.scala                | 268 --------------
 .../integrationtest/storm/StormClient.scala     |  91 -----
 .../gearpump/integrationtest/Docker.scala       | 211 +++++++++++
 .../gearpump/integrationtest/ShellExec.scala    |  98 +++++
 .../apache/gearpump/integrationtest/Util.scala  |  72 ++++
 .../integrationtest/hadoop/HadoopCluster.scala  |  67 ++++
 .../integrationtest/kafka/KafkaCluster.scala    | 140 +++++++
 .../kafka/NumericalDataProducer.scala           |  76 ++++
 .../integrationtest/kafka/ResultVerifier.scala  |  42 +++
 .../kafka/SimpleKafkaReader.scala               |  49 +++
 .../minicluster/BaseContainer.scala             |  60 +++
 .../minicluster/CommandLineClient.scala         |  84 +++++
 .../minicluster/MiniCluster.scala               | 189 ++++++++++
 .../minicluster/RestClient.scala                | 268 ++++++++++++++
 .../integrationtest/storm/StormClient.scala     |  91 +++++
 .../integrationtest/storm/Adaptor.scala         |  38 --
 .../storm/Storm010KafkaTopology.scala           |  95 -----
 .../integrationtest/storm/Adaptor.scala         |  38 ++
 .../storm/Storm010KafkaTopology.scala           |  95 +++++
 .../integrationtest/storm/Adaptor.scala         |  38 --
 .../storm/Storm09KafkaTopology.scala            |  95 -----
 .../integrationtest/storm/Adaptor.scala         |  38 ++
 .../storm/Storm09KafkaTopology.scala            |  95 +++++
 56 files changed, 3293 insertions(+), 3293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
deleted file mode 100644
index 4a161c7..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Provides a min cluster of Gearpump, which contains one or more masters, and 
workers.
- */
-object MiniClusterProvider {
-
-  private var instance = new MiniCluster
-
-  def get: MiniCluster = instance
-
-  def set(instance: MiniCluster): MiniCluster = {
-    this.instance = instance
-    instance
-  }
-
-  /**
-   * Indicates whether test suite should create particular cluster. In case of 
false, every
-   * test spec will be responsible for cluster creation.
-   */
-  var managed = false
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
deleted file mode 100644
index dabcc71..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.scalatest._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
-import org.apache.gearpump.util.LogUtil
-
-/**
- * The abstract test spec
- */
-trait TestSpecBase
-  extends WordSpec with Matchers with BeforeAndAfterEachTestData with 
BeforeAndAfterAll {
-
-  private def LOGGER = LogUtil.getLogger(getClass)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    if (!MiniClusterProvider.managed) {
-      LOGGER.info("Will test with a default standalone mini cluster")
-      MiniClusterProvider.get.start()
-    }
-  }
-
-  override def afterAll(): Unit = {
-    if (!MiniClusterProvider.managed) {
-      LOGGER.info("Will shutdown the default mini cluster")
-      MiniClusterProvider.get.shutDown()
-    }
-    super.afterAll()
-  }
-
-  lazy val cluster = MiniClusterProvider.get
-  lazy val commandLineClient = cluster.commandLineClient
-  lazy val restClient = cluster.restClient
-
-  lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
-  lazy val wordCountName = "wordCount"
-
-  var restartClusterRequired: Boolean = false
-
-  override def beforeEach(td: TestData): Unit = {
-
-    LOGGER.debug(s">### 
=============================================================")
-    LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
-
-    assert(cluster != null, "Configure MiniCluster properly in suite spec")
-    cluster.isAlive shouldBe true
-    restClient.listRunningApps().isEmpty shouldBe true
-    LOGGER.debug(s">### 
=============================================================")
-    LOGGER.debug(s">###2 Start test: ${td.name}\n")
-  }
-
-  override def afterEach(td: TestData): Unit = {
-    LOGGER.debug(s"<### 
=============================================================")
-    LOGGER.debug(s"<###3 End test: ${td.name}\n")
-
-    if (restartClusterRequired || !cluster.isAlive) {
-      restartClusterRequired = false
-      LOGGER.info("Will restart the cluster for next test case")
-      cluster.restart()
-    } else {
-      restClient.listRunningApps().foreach(app => {
-        commandLineClient.killApp(app.appId) shouldBe true
-      })
-    }
-  }
-
-  def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = 
{
-    val app = restClient.queryApp(appId)
-    app.status shouldEqual MasterToAppMaster.AppMasterActive
-    app.appName shouldEqual expectedAppName
-    app
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
deleted file mode 100644
index eabc684..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.TestSpecBase
-
-/**
- * The test spec checks the command-line usage
- */
-class CommandLineSpec extends TestSpecBase {
-
-  "use `gear info` to list applications" should {
-    "retrieve 0 application after cluster just started" in {
-      // exercise
-      getRunningAppCount shouldEqual 0
-    }
-
-    "retrieve 1 application after the first application submission" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-
-      // exercise
-      getRunningAppCount shouldEqual 1
-    }
-  }
-
-  "use `gear app` to submit application" should {
-    "find a running application after submission" in {
-      // exercise
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-    }
-
-    "reject a repeated submission request while the application is running" in 
{
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-
-      // exercise
-      val actualAppId = commandLineClient.submitApp(wordCountJar)
-      actualAppId shouldEqual -1
-    }
-
-    "reject an invalid submission (the jar file path is incorrect)" in {
-      // exercise
-      val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
-      actualAppId shouldEqual -1
-    }
-  }
-
-  "use `gear kill` to kill application" should {
-    "a running application should be killed" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-
-      // exercise
-      val success = commandLineClient.killApp(appId)
-      success shouldBe true
-    }
-
-    "should fail when attempting to kill a stopped application" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      var success = commandLineClient.killApp(appId)
-      success shouldBe true
-
-      // exercise
-      success = commandLineClient.killApp(appId)
-      success shouldBe false
-    }
-
-    "the EmbededCluster can be used as embedded cluster in process" in {
-      // setup
-      val args = "-debug true -sleep 10"
-      val appId = expectSubmitAppSuccess(wordCountJar, args)
-      var success = commandLineClient.killApp(appId)
-      success shouldBe true
-    }
-
-    "should fail when attempting to kill a non-exist application" in {
-      // setup
-      val freeAppId = getNextAvailableAppId
-
-      // exercise
-      val success = commandLineClient.killApp(freeAppId)
-      success shouldBe false
-    }
-  }
-
-  "use `gear replay` to replay the application from current min clock" should {
-    "todo: description" in {
-      // todo: test code
-    }
-  }
-
-  private def getRunningAppCount: Int = {
-    commandLineClient.listRunningApps().length
-  }
-
-  private def getNextAvailableAppId: Int = {
-    commandLineClient.listApps().length + 1
-  }
-
-  private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
-    val appId = commandLineClient.submitApp(jar)
-    appId should not equal -1
-    appId
-  }
-
-  private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: 
String): Unit = {
-    val actual = commandLineClient.queryApp(appId)
-    actual should include(s"application: $appId, ")
-    actual should include(s"name: $expectedName, ")
-    actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
deleted file mode 100644
index d8bdc1e..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.scalatest.TestData
-
-import org.apache.gearpump.integrationtest.kafka._
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the Kafka datasource connector
- */
-class ConnectorKafkaSpec extends TestSpecBase {
-
-  private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
-  private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
-  private var producer: NumericalDataProducer = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    kafkaCluster.start()
-  }
-
-  override def afterAll(): Unit = {
-    kafkaCluster.shutDown()
-    super.afterAll()
-  }
-
-  override def afterEach(test: TestData): Unit = {
-    super.afterEach(test)
-    if (producer != null) {
-      producer.stop()
-      producer = null
-    }
-  }
-
-  "KafkaSource and KafkaSink" should {
-    "read from and write to kafka" in {
-      // setup
-      val sourceTopic = "topic1"
-      val sinkTopic = "topic2"
-      val messageNum = 10000
-      kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
-
-      // exercise
-      val args = 
Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
-        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-        "-brokerList", kafkaCluster.getBrokerListConnectString,
-        "-sourceTopic", sourceTopic,
-        "-sinkTopic", sinkTopic).mkString(" ")
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, 
cluster.getWorkerHosts.length, args)
-      success shouldBe true
-
-      // verify
-      expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == 
messageNum,
-        "kafka all message written")
-    }
-  }
-
-  "Gearpump with Kafka" should {
-    "support at-least-once message delivery" in {
-      // setup
-      val sourcePartitionNum = 2
-      val sourceTopic = "topic3"
-      val sinkTopic = "topic4"
-      // Generate number sequence (1, 2, 3, ...) to the topic
-      kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-      producer = new NumericalDataProducer(sourceTopic, 
kafkaCluster.getBrokerListConnectString)
-      producer.start()
-
-      // exercise
-      val args = 
Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
-        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-        "-brokerList", kafkaCluster.getBrokerListConnectString,
-        "-sourceTopic", sourceTopic,
-        "-sinkTopic", sinkTopic,
-        "-source", sourcePartitionNum).mkString(" ")
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, 
cluster.getWorkerHosts.length, args)
-      success shouldBe true
-
-      // verify #1
-      expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
-
-      // verify #2
-      val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
-      restClient.killExecutor(appId, executorToKill) shouldBe true
-      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-        .map(_.executorId).max > executorToKill,
-        s"executor $executorToKill killed")
-
-      // verify #3
-      val detector = new MessageLossDetector(producer.lastWriteNum)
-      val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
-        host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
-      Util.retryUntil(() => {
-        kafkaReader.read()
-        detector.allReceived
-      }, "kafka all message read")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
deleted file mode 100644
index 56b33c1..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.metrics.Metrics.Meter
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-class DynamicDagSpec extends TestSpecBase {
-
-  lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
-  val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
-  val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
-  val solName = "sol"
-
-  "dynamic dag" should {
-    "can retrieve a list of built-in partitioner classes" in {
-      val partitioners = restClient.queryBuiltInPartitioners()
-      partitioners.length should be > 0
-      partitioners.foreach(clazz =>
-        clazz should startWith("org.apache.gearpump.partitioner.")
-      )
-    }
-
-    "can compose a wordcount application from scratch" in {
-      // todo: blocked by #1450
-    }
-
-    "can replace down stream with wordcount's sum processor (new processor 
will have metrics)" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor successfully added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
-    }
-
-    "can replace up stream with wordcount's split processor (new processor 
will have metrics)" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 0, splitTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, 
"sendThroughput")
-    }
-
-    "fall back to last dag version when replacing a processor failid" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
-
-      val fakeTaskClass = 
"org.apache.gearpump.streaming.examples.wordcount.Fake"
-      replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
-      Util.retryUntil(() => {
-        val processorsAfterFailure = 
restClient.queryStreamingAppDetail(appId).processors
-        processorsAfterFailure.size == laterProcessors.size
-      }, "new processor added")
-      val currentClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
currentClock,
-        "app clock is advancing")
-    }
-
-    "fall back to last dag version when AppMaster HA triggered" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerAppMaster = restClient.queryApp(appId).appMasterPath
-      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
-
-      restClient.killAppMaster(appId) shouldBe true
-      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
-        "new AppMaster created")
-      val processors = restClient.queryStreamingAppDetail(appId).processors
-      processors.size shouldEqual laterProcessors.size
-    }
-  }
-
-  private def expectSolJarSubmittedWithAppId(): Int = {
-    val appId = restClient.getNextAvailableAppId()
-    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
-    success shouldBe true
-    expectAppIsRunning(appId, solName)
-    Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
-    appId
-  }
-
-  private def replaceProcessor(
-      appId: Int,
-      formerProcessorId: Int,
-      newTaskClass: String,
-      newProcessorDescription: String = "",
-      newParallelism: Int = 1): Unit = {
-    val uploadedJar = restClient.uploadJar(wordCountJar)
-    val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
-      newParallelism, newProcessorDescription,
-      jar = uploadedJar)
-
-    // exercise
-    val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
-    success shouldBe true
-  }
-
-  private def processorHasThroughput(appId: Int, processorId: Int, metrics: 
String): Unit = {
-    Util.retryUntil(() => {
-      val actual = restClient.queryStreamingAppMetrics(appId, current = false,
-        path = "processor" + processorId)
-      val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
-      throughput.size should be > 0
-      throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
-    }, "new processor has message received")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
deleted file mode 100644
index 27e4665..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-/**
- * The test spec will perform destructive operations to check the stability
- */
-class ExampleSpec extends TestSpecBase {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  "distributed shell" should {
-    "execute commands on machines where its executors are running" in {
-      val distShellJar = 
cluster.queryBuiltInExampleJars("distributedshell-").head
-      val mainClass = 
"org.apache.gearpump.examples.distributedshell.DistributedShell"
-      val clientClass = 
"org.apache.gearpump.examples.distributedshell.DistributedShellClient"
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(distShellJar, 
cluster.getWorkerHosts.length, mainClass)
-      success shouldBe true
-      expectAppIsRunning(appId, "DistributedShell")
-      val args = Array(
-        clientClass,
-        "-appid", appId.toString,
-        "-command", "hostname"
-      )
-
-      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
-
-      def verify(): Boolean = {
-        val workerNum = cluster.getWorkerHosts.length
-        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
-          workerNum, args.mkString(" ")).split("\n").
-          filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
-        expectedHostNames.forall(result.contains)
-      }
-
-      Util.retryUntil(() => verify(),
-        s"executors started on all expected hosts 
${expectedHostNames.mkString(", ")}")
-    }
-  }
-
-  "wordcount" should {
-    val wordCountJarNamePrefix = "wordcount-"
-    behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
-
-    "can submit immediately after killing a former one" in {
-      // setup
-      val formerAppId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess =
-        restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(formerAppId, wordCountName)
-      Util.retryUntil(() =>
-        restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app 
running")
-      restClient.killApp(formerAppId)
-
-      // exercise
-      val appId = formerAppId + 1
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-    }
-  }
-
-  "wordcount(java)" should {
-    val wordCountJavaJarNamePrefix = "wordcountjava-"
-    val wordCountJavaName = "wordcountJava"
-    behave like streamingApplication(wordCountJavaJarNamePrefix, 
wordCountJavaName)
-  }
-
-  "sol" should {
-    val solJarNamePrefix = "sol-"
-    val solName = "sol"
-    behave like streamingApplication(solJarNamePrefix, solName)
-  }
-
-  "complexdag" should {
-    val dynamicDagJarNamePrefix = "complexdag-"
-    val dynamicDagName = "dag"
-    behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
-  }
-
-  def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
-    lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
-
-    "can obtain application clock and the clock will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, appName)
-
-      // exercise
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app submitted")
-      val formerClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
formerClock,
-        "app clock is advancing")
-    }
-
-    "can change the parallelism and description of a processor" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(jar, 
cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(appId, appName)
-      val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
-      val processor0 = formerProcessors.get(0).get
-      val expectedProcessorId = formerProcessors.size
-      val expectedParallelism = processor0.parallelism + 1
-      val expectedDescription = processor0.description + "new"
-      val replaceMe = new ProcessorDescription(processor0.id, 
processor0.taskClass,
-        expectedParallelism, description = expectedDescription)
-
-      // exercise
-      val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
-      success shouldBe true
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new process added")
-      val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
-      laterProcessor0.parallelism shouldEqual expectedParallelism
-      laterProcessor0.description shouldEqual expectedDescription
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
deleted file mode 100644
index bb9982a..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
-import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
-import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, 
SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * Checks message delivery consistency, like at-least-once, and exactly-once.
- */
-class MessageDeliverySpec extends TestSpecBase {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-  }
-
-  "Gearpump" should {
-    "support exactly-once message delivery" in {
-      withKafkaCluster(cluster) { kafkaCluster =>
-        // setup
-        val sourcePartitionNum = 1
-        val sourceTopic = "topic1"
-        val sinkTopic = "topic2"
-
-        // Generate number sequence (1, 2, 3, ...) to the topic
-        kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
-        withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) 
{ producer =>
-
-          withHadoopCluster { hadoopCluster =>
-            // exercise
-            val args = 
Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
-              "-defaultFS", hadoopCluster.getDefaultFS,
-              "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-              "-brokerList", kafkaCluster.getBrokerListConnectString,
-              "-sourceTopic", sourceTopic,
-              "-sinkTopic", sinkTopic,
-              "-sourceTask", sourcePartitionNum).mkString(" ")
-            val appId = restClient.getNextAvailableAppId()
-
-            val stateJar = cluster.queryBuiltInExampleJars("state-").head
-            val success = restClient.submitApp(stateJar, executorNum = 1, args 
= args)
-            success shouldBe true
-
-            // verify #1
-            expectAppIsRunning(appId, "MessageCount")
-            Util.retryUntil(() => 
restClient.queryStreamingAppDetail(appId).clock > 0,
-              "app is running")
-
-            // wait for checkpoint to take place
-            Thread.sleep(1000)
-
-            LOG.info("Trigger message replay by kill and restart the 
executors")
-            val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
-            restClient.killExecutor(appId, executorToKill) shouldBe true
-            Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-              .map(_.executorId).max > executorToKill, s"executor 
$executorToKill killed")
-
-            producer.stop()
-            val producedNumbers = producer.producedNumbers
-            LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
-              s", ${producedNumbers.end - 1}] have been written to Kafka")
-
-            // verify #3
-            val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
-
-            assert(producedNumbers.size == kafkaSourceOffset,
-              "produced message should match Kafka queue size")
-
-            LOG.info(s"The Kafka source topic $sourceTopic offset is " + 
kafkaSourceOffset)
-
-            // The sink processor of this job (MessageCountApp) writes total 
message
-            // count to Kafka Sink periodically (once every checkpoint 
interval).
-            // The detector keep record of latest message count.
-            val detector = new ResultVerifier {
-              var latestMessageCount: Int = 0
-              override def onNext(messageCount: Int): Unit = {
-                this.latestMessageCount = messageCount
-              }
-            }
-
-            val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
-              host = kafkaCluster.advertisedHost, port = 
kafkaCluster.advertisedPort)
-            Util.retryUntil(() => {
-              kafkaReader.read()
-              LOG.info(s"Received message count: 
${detector.latestMessageCount}, " +
-                s"expect: ${producedNumbers.size}")
-              detector.latestMessageCount == producedNumbers.size
-            }, "MessageCountApp calculated message count matches " +
-              "expected in case of message replay")
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
deleted file mode 100644
index 2f5bb64..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.master.MasterStatus
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks REST service usage
- */
-class RestServiceSpec extends TestSpecBase {
-
-  "query system version" should {
-    "retrieve the current version number" in {
-      restClient.queryVersion() should not be empty
-    }
-  }
-
-  "list applications" should {
-    "retrieve 0 application after cluster just started" in {
-      restClient.listRunningApps().length shouldEqual 0
-    }
-
-    "retrieve 1 application after the first application submission" in {
-      // exercise
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      restClient.listRunningApps().length shouldEqual 1
-    }
-  }
-
-  "submit application (wordcount)" should {
-    "find a running application after submission" in {
-      // exercise
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-    }
-
-    "reject a repeated submission request while the application is running" in 
{
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
-        cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe false
-    }
-
-    "reject an invalid submission (the jar file path is incorrect)" in {
-      // exercise
-      val success = restClient.submitApp(wordCountJar + ".missing", 
cluster.getWorkerHosts.length)
-      success shouldBe false
-    }
-
-    "submit a wordcount application with 4 split and 3 sum processors and 
expect " +
-      "parallelism of processors match the given number" in {
-      // setup
-      val splitNum = 4
-      val sumNum = 3
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
-        s"-split $splitNum -sum $sumNum")
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      val processors = restClient.queryStreamingAppDetail(appId).processors
-      processors.size shouldEqual 2
-      val splitProcessor = processors.get(0).get
-      splitProcessor.parallelism shouldEqual splitNum
-      val sumProcessor = processors.get(1).get
-      sumProcessor.parallelism shouldEqual sumNum
-    }
-
-    "can obtain application metrics and the metrics will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryStreamingAppMetrics(appId, current = 
true).metrics.nonEmpty,
-        "metrics available")
-      val actual = restClient.queryStreamingAppMetrics(appId, current = true)
-      actual.path shouldEqual s"app$appId.processor*"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryStreamingAppMetrics(appId, current 
= true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-
-    "can obtain application corresponding executors' metrics and " +
-      "the metrics will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryExecutorMetrics(appId, current = 
true).metrics.nonEmpty,
-        "metrics available")
-      val actual = restClient.queryExecutorMetrics(appId, current = true)
-      actual.path shouldEqual s"app$appId.executor*"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryExecutorMetrics(appId, current = 
true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-  }
-
-  "kill application" should {
-    "a running application should be killed" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      killAppAndVerify(appId)
-    }
-
-    "should fail when attempting to kill a stopped application" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val submissionSucess = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      submissionSucess shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      killAppAndVerify(appId)
-
-      // exercise
-      val success = restClient.killApp(appId)
-      success shouldBe false
-    }
-
-    "should fail when attempting to kill a non-exist application" in {
-      // setup
-      val freeAppId = restClient.listApps().length + 1
-
-      // exercise
-      val success = restClient.killApp(freeAppId)
-      success shouldBe false
-    }
-  }
-
-  "cluster information" should {
-    "retrieve 1 master for a non-HA cluster" in {
-      // exercise
-      val masterSummary = restClient.queryMaster()
-      masterSummary.cluster.map(_.toTuple) shouldEqual 
cluster.getMastersAddresses
-      masterSummary.aliveFor should be > 0L
-      masterSummary.masterStatus shouldEqual MasterStatus.Synced
-    }
-
-    "retrieve the same number of workers as cluster has" in {
-      // setup
-      val expectedWorkersCount = cluster.getWorkerHosts.size
-
-      // exercise
-      var runningWorkers: Array[WorkerSummary] = Array.empty
-      Util.retryUntil(() => {
-        runningWorkers = restClient.listRunningWorkers()
-        runningWorkers.length == expectedWorkersCount
-      }, "all workers running")
-      runningWorkers.foreach { worker =>
-        worker.state shouldEqual MasterToAppMaster.AppMasterActive
-      }
-    }
-
-    "find a newly added worker instance" in {
-      // setup
-      restartClusterRequired = true
-      val formerWorkersCount = cluster.getWorkerHosts.length
-      Util.retryUntil(() => restClient.listRunningWorkers().length == 
formerWorkersCount,
-        "all workers running")
-      val workerName = "newWorker"
-
-      // exercise
-      cluster.addWorkerNode(workerName)
-      Util.retryUntil(() => restClient.listRunningWorkers().length > 
formerWorkersCount,
-        "new worker added")
-      cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
-      restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
-    }
-
-    "retrieve 0 worker, if cluster is started without any workers" in {
-      // setup
-      restartClusterRequired = true
-      cluster.shutDown()
-
-      // exercise
-      cluster.start(workerNum = 0)
-      cluster.getWorkerHosts.length shouldEqual 0
-      restClient.listRunningWorkers().length shouldEqual 0
-    }
-
-    "can obtain master's metrics and the metrics will keep changing" in {
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryMasterMetrics(current = true).metrics.nonEmpty, 
"metrics available")
-      val actual = restClient.queryMasterMetrics(current = true)
-      actual.path shouldEqual s"master"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryMasterMetrics(current = 
true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-
-    "can obtain workers' metrics and the metrics will keep changing" in {
-      // exercise
-      restClient.listRunningWorkers().foreach { worker =>
-        val workerId = worker.workerId
-        expectMetricsAvailable(
-          restClient.queryWorkerMetrics(workerId, current = 
true).metrics.nonEmpty,
-          "metrics available")
-        val actual = restClient.queryWorkerMetrics(workerId, current = true)
-        actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
-        actual.metrics.foreach(metric => {
-          metric.time should be > 0L
-          metric.value should not be null
-        })
-        val formerMetricsDump = actual.metrics.toString()
-
-        expectMetricsAvailable({
-          val laterMetrics = restClient.queryWorkerMetrics(workerId, current = 
true).metrics
-          laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-        }, "metrics available")
-      }
-    }
-  }
-
-  "configuration" should {
-    "retrieve the configuration of master and match particular values" in {
-      // exercise
-      val actual = restClient.queryMasterConfig()
-      actual.hasPath("gearpump") shouldBe true
-      actual.hasPath("gearpump.cluster") shouldBe true
-      actual.getString("gearpump.hostname") shouldEqual 
cluster.getMasterHosts.mkString(",")
-    }
-
-    "retrieve the configuration of worker X and match particular values" in {
-      // exercise
-      restClient.listRunningWorkers().foreach { worker =>
-        val actual = restClient.queryWorkerConfig(worker.workerId)
-        actual.hasPath("gearpump") shouldBe true
-        actual.hasPath("gearpump.worker") shouldBe true
-      }
-    }
-
-    "retrieve the configuration of executor X and match particular values" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      restClient.queryExecutorBrief(appId).foreach { executor =>
-        val executorId = executor.executorId
-        val actual = restClient.queryExecutorConfig(appId, executorId)
-        actual.hasPath("gearpump") shouldBe true
-        actual.hasPath("gearpump.executor") shouldBe true
-        actual.getInt("gearpump.applicationId") shouldEqual appId
-        actual.getInt("gearpump.executorId") shouldEqual executorId
-      }
-    }
-
-    "retrieve the configuration of application X and match particular values" 
in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
-      success shouldBe true
-      val actual = restClient.queryAppMasterConfig(appId)
-      actual.hasPath("gearpump") shouldBe true
-      actual.hasPath("gearpump.appmaster") shouldBe true
-    }
-  }
-
-  "application life-cycle" should {
-    "newly started application should be configured same as the previous one, 
after restart" in {
-      // setup
-      val originSplitNum = 4
-      val originSumNum = 3
-      val originAppId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
-        s"-split $originSplitNum -sum $originSumNum")
-      success shouldBe true
-      expectAppIsRunning(originAppId, wordCountName)
-      val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
-
-      // exercise
-      Util.retryUntil(() => restClient.restartApp(originAppId), "app 
restarted")
-      val killedApp = restClient.queryApp(originAppId)
-      killedApp.appId shouldEqual originAppId
-      killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
-      val runningApps = restClient.listRunningApps()
-      runningApps.length shouldEqual 1
-      val newAppDetail = 
restClient.queryStreamingAppDetail(runningApps.head.appId)
-      newAppDetail.appName shouldEqual originAppDetail.appName
-      newAppDetail.processors.size shouldEqual originAppDetail.processors.size
-      newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
-      newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
-    }
-  }
-
-  private def killAppAndVerify(appId: Int): Unit = {
-    val success = restClient.killApp(appId)
-    success shouldBe true
-
-    val actualApp = restClient.queryApp(appId)
-    actualApp.appId shouldEqual appId
-    actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
-  }
-
-  private def expectMetricsAvailable(condition: => Boolean, 
conditionDescription: String): Unit = {
-    val config = restClient.queryMasterConfig()
-    val reportInterval = 
Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
-    Util.retryUntil(() => condition, conditionDescription, interval = 
reportInterval)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
deleted file mode 100644
index 4b15055..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration.Duration
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-/**
- * The test spec will perform destructive operations to check the stability. 
Operations
- * contains shutting-down appmaster, executor, or worker, and etc..
- */
-class StabilitySpec extends TestSpecBase {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  "kill appmaster" should {
-    "restart the whole application" in {
-      // setup
-      val appId = commandLineClient.submitApp(wordCountJar)
-      val formerAppMaster = restClient.queryApp(appId).appMasterPath
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
-      ensureClockStoredInMaster()
-
-      // exercise
-      restClient.killAppMaster(appId) shouldBe true
-      // todo: how long master will begin to recover and how much time for the 
recovering?
-      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
-        "appmaster killed and restarted")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  "kill executor" should {
-    "will create a new executor and application will replay from the latest 
application clock" in {
-      // setup
-      val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
-      val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
-      ensureClockStoredInMaster()
-
-      // exercise
-      restClient.killExecutor(appId, executorToKill) shouldBe true
-      // todo: how long appmaster will begin to recover and how much time for 
the recovering?
-      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-        .map(_.executorId).max > executorToKill,
-        s"executor $executorToKill killed and restarted")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  private def hostName(workerId: WorkerId): String = {
-    val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
-    // Parse hostname from JVM info (in format: PID@hostname)
-    val hostname = worker.get.jvmName.split("@")(1)
-    hostname
-  }
-
-  "kill worker" should {
-    "worker will not recover but all its executors will be migrated to other 
workers" in {
-      // setup
-      restartClusterRequired = true
-      val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
-
-      val allexecutors = restClient.queryExecutorBrief(appId)
-      val maxExecutor = allexecutors.sortBy(_.executorId).last
-      ensureClockStoredInMaster()
-
-      val appMaster = allexecutors.find(_.executorId == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
-        s"worker: ${maxExecutor.workerId}")
-      val executorsSharingSameWorker = allexecutors
-        .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
-      LOG.info(s"These executors sharing the same worker Id 
${maxExecutor.workerId}," +
-        s" ${executorsSharingSameWorker.mkString(",")}")
-
-      // kill the worker and expect restarting all killed executors on other 
workers.
-      val workerIdToKill = maxExecutor.workerId
-      cluster.removeWorkerNode(hostName(workerIdToKill))
-
-      val appMasterKilled = executorsSharingSameWorker
-        .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-      def executorsMigrated(): Boolean = {
-        val executors = restClient.queryExecutorBrief(appId)
-        val newAppMaster = executors.find(_.executorId == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-        if (appMasterKilled) {
-          newAppMaster.get.workerId != appMaster.get.workerId
-        } else {
-          // New executors will be started to replace killed executors.
-          // The new executors will be assigned larger executor Id. We use 
this trick to detect
-          // Whether new executors have been started successfully.
-          executors.map(_.executorId).max > maxExecutor.executorId
-        }
-      }
-
-      Util.retryUntil(() => {
-        executorsMigrated()
-      }, s"new executor created with id > ${maxExecutor.executorId} when 
worker is killed")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  "kill master" should {
-    "master will be down and all workers will attempt to reconnect and suicide 
after X seconds" in {
-      // setup
-      restartClusterRequired = true
-      val masters = cluster.getMasterHosts
-      val config = restClient.queryMasterConfig()
-      val shutDownTimeout = 
Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
-
-      // exercise
-      masters.foreach(cluster.removeMasterNode)
-      info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers 
are down")
-      Thread.sleep(shutDownTimeout.toMillis)
-
-      // verify
-      val aliveWorkers = cluster.getWorkerHosts
-      Util.retryUntil(() => aliveWorkers.forall(worker => 
!cluster.nodeIsOnline(worker)),
-        "all workers down")
-    }
-  }
-
-  private def ensureClockStoredInMaster(): Unit = {
-    // TODO: 5000ms is a fixed sync period in clock service.
-    // we wait for 5000ms to assume the clock is stored
-    Thread.sleep(5000)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
deleted file mode 100644
index b327bf4..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, 
MessageLossDetector, SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.storm.StormClient
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the compatibility of running Storm applications
- */
-class StormCompatibilitySpec extends TestSpecBase {
-
-  private lazy val stormClient = {
-    new StormClient(cluster, restClient)
-  }
-
-  val `version0.9` = "09"
-  val `version0.10` = "010"
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    stormClient.start()
-  }
-
-  override def afterAll(): Unit = {
-    stormClient.shutDown()
-    super.afterAll()
-  }
-
-  def withStorm(testCode: String => Unit): Unit = {
-    testCode(`version0.9`)
-    testCode(`version0.10`)
-  }
-
-  def getTopologyName(name: String, stormVersion: String): String = {
-    s"${name}_$stormVersion"
-  }
-
-  def getStormJar(stormVersion: String): String = {
-    cluster.queryBuiltInITJars(s"storm$stormVersion-").head
-  }
-
-  "Storm over Gearpump" should withStorm {
-    stormVersion =>
-      s"support basic topologies ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("exclamation", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.ExclamationTopology",
-          args = topologyName,
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
-      }
-
-      s"support to run a python version of wordcount ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("wordcount", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.WordCountTopology",
-          args = topologyName,
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
-      }
-
-      s"support DRPC ($stormVersion)" in {
-        // ReachTopology computes the Twitter url reached by users and their 
followers
-        // using Storm Distributed RPC feature
-        // input (user and follower) data are already prepared in memory
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("reach", stormVersion)
-        stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.ReachTopology",
-          args = topologyName,
-          appName = topologyName)
-        val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
-
-        // verify
-        Util.retryUntil(() => {
-          drpcClient.execute("reach", "notaurl.com") == "0"
-        }, "drpc reach == 0")
-        drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
-        drpcClient.execute("reach", "engineering.twitter.com/blog/5") 
shouldEqual "14"
-      }
-
-      s"support tick tuple ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.RollingTopWords",
-          args = s"$topologyName remote",
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
-      }
-
-      s"support at-least-once semantics with Storm's Kafka connector 
($stormVersion)" in {
-
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("storm_kafka", stormVersion)
-        val stormKafkaTopology =
-          
s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
-
-        import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
-        withKafkaCluster(cluster) {
-          kafkaCluster =>
-            val sourcePartitionNum = 2
-            val sinkPartitionNum = 1
-            val zookeeper = kafkaCluster.getZookeeperConnectString
-            val brokerList = kafkaCluster.getBrokerListConnectString
-            val sourceTopic = "topic1"
-            val sinkTopic = "topic2"
-
-            val args = Array("-topologyName", topologyName, "-sourceTopic", 
sourceTopic,
-              "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, 
"-brokerList", brokerList,
-              "-spoutNum", s"$sourcePartitionNum", "-boltNum", 
s"$sinkPartitionNum"
-            )
-
-            kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
-            // generate number sequence (1, 2, 3, ...) to the topic
-            withDataProducer(sourceTopic, brokerList) { producer =>
-
-              val appId = stormClient.submitStormApp(
-                jar = stormJar,
-                mainClass = stormKafkaTopology,
-                args = args.mkString(" "),
-                appName = topologyName)
-
-              Util.retryUntil(() =>
-                restClient.queryStreamingAppDetail(appId).clock > 0, "app 
running")
-
-              // kill executor and verify at-least-once is guaranteed on 
application restart
-              val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
-              restClient.killExecutor(appId, executorToKill) shouldBe true
-              Util.retryUntil(() =>
-                restClient.queryExecutorBrief(appId).map(_.executorId).max > 
executorToKill,
-                s"executor $executorToKill killed")
-
-              // verify no message loss
-              val detector = new
-                  MessageLossDetector(producer.lastWriteNum)
-              val kafkaReader = new
-                  SimpleKafkaReader(detector, sinkTopic, host = 
kafkaCluster.advertisedHost,
-                    port = kafkaCluster.advertisedPort)
-
-              Util.retryUntil(() => {
-                kafkaReader.read()
-                detector.allReceived
-              }, "all kafka message read")
-            }
-        }
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
deleted file mode 100644
index 7942308..0000000
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.suites
-
-import org.scalatest._
-
-import org.apache.gearpump.integrationtest.MiniClusterProvider
-import org.apache.gearpump.integrationtest.checklist._
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Launch a Gearpump cluster in standalone mode and run all test specs. To 
test a specific
- * test spec, you need to comment out other lines.
- */
-class StandaloneModeSuite extends Suites(
-  new CommandLineSpec,
-  new RestServiceSpec,
-  new ExampleSpec,
-  new DynamicDagSpec,
-  new StormCompatibilitySpec,
-  new StabilitySpec,
-  new ConnectorKafkaSpec,
-  new MessageDeliverySpec
-) with BeforeAndAfterAll {
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    MiniClusterProvider.managed = true
-    MiniClusterProvider.set(new MiniCluster).start()
-  }
-
-  override def afterAll(): Unit = {
-    MiniClusterProvider.get.shutDown()
-    super.afterAll()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
new file mode 100644
index 0000000..4a161c7
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Provides a min cluster of Gearpump, which contains one or more masters, and 
workers.
+ */
+object MiniClusterProvider {
+
+  private var instance = new MiniCluster
+
+  def get: MiniCluster = instance
+
+  def set(instance: MiniCluster): MiniCluster = {
+    this.instance = instance
+    instance
+  }
+
+  /**
+   * Indicates whether test suite should create particular cluster. In case of 
false, every
+   * test spec will be responsible for cluster creation.
+   */
+  var managed = false
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
new file mode 100644
index 0000000..dabcc71
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.scalatest._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * The abstract test spec
+ */
+trait TestSpecBase
+  extends WordSpec with Matchers with BeforeAndAfterEachTestData with 
BeforeAndAfterAll {
+
+  private def LOGGER = LogUtil.getLogger(getClass)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    if (!MiniClusterProvider.managed) {
+      LOGGER.info("Will test with a default standalone mini cluster")
+      MiniClusterProvider.get.start()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    if (!MiniClusterProvider.managed) {
+      LOGGER.info("Will shutdown the default mini cluster")
+      MiniClusterProvider.get.shutDown()
+    }
+    super.afterAll()
+  }
+
+  lazy val cluster = MiniClusterProvider.get
+  lazy val commandLineClient = cluster.commandLineClient
+  lazy val restClient = cluster.restClient
+
+  lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
+  lazy val wordCountName = "wordCount"
+
+  var restartClusterRequired: Boolean = false
+
+  override def beforeEach(td: TestData): Unit = {
+
+    LOGGER.debug(s">### 
=============================================================")
+    LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
+
+    assert(cluster != null, "Configure MiniCluster properly in suite spec")
+    cluster.isAlive shouldBe true
+    restClient.listRunningApps().isEmpty shouldBe true
+    LOGGER.debug(s">### 
=============================================================")
+    LOGGER.debug(s">###2 Start test: ${td.name}\n")
+  }
+
+  override def afterEach(td: TestData): Unit = {
+    LOGGER.debug(s"<### 
=============================================================")
+    LOGGER.debug(s"<###3 End test: ${td.name}\n")
+
+    if (restartClusterRequired || !cluster.isAlive) {
+      restartClusterRequired = false
+      LOGGER.info("Will restart the cluster for next test case")
+      cluster.restart()
+    } else {
+      restClient.listRunningApps().foreach(app => {
+        commandLineClient.killApp(app.appId) shouldBe true
+      })
+    }
+  }
+
+  def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = 
{
+    val app = restClient.queryApp(appId)
+    app.status shouldEqual MasterToAppMaster.AppMasterActive
+    app.appName shouldEqual expectedAppName
+    app
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
new file mode 100644
index 0000000..eabc684
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.TestSpecBase
+
+/**
+ * The test spec checks the command-line usage
+ */
+class CommandLineSpec extends TestSpecBase {
+
+  "use `gear info` to list applications" should {
+    "retrieve 0 application after cluster just started" in {
+      // exercise
+      getRunningAppCount shouldEqual 0
+    }
+
+    "retrieve 1 application after the first application submission" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+      // exercise
+      getRunningAppCount shouldEqual 1
+    }
+  }
+
+  "use `gear app` to submit application" should {
+    "find a running application after submission" in {
+      // exercise
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+    }
+
+    "reject a repeated submission request while the application is running" in 
{
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+      // exercise
+      val actualAppId = commandLineClient.submitApp(wordCountJar)
+      actualAppId shouldEqual -1
+    }
+
+    "reject an invalid submission (the jar file path is incorrect)" in {
+      // exercise
+      val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
+      actualAppId shouldEqual -1
+    }
+  }
+
+  "use `gear kill` to kill application" should {
+    "a running application should be killed" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+
+      // exercise
+      val success = commandLineClient.killApp(appId)
+      success shouldBe true
+    }
+
+    "should fail when attempting to kill a stopped application" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      var success = commandLineClient.killApp(appId)
+      success shouldBe true
+
+      // exercise
+      success = commandLineClient.killApp(appId)
+      success shouldBe false
+    }
+
+    "the EmbededCluster can be used as embedded cluster in process" in {
+      // setup
+      val args = "-debug true -sleep 10"
+      val appId = expectSubmitAppSuccess(wordCountJar, args)
+      var success = commandLineClient.killApp(appId)
+      success shouldBe true
+    }
+
+    "should fail when attempting to kill a non-exist application" in {
+      // setup
+      val freeAppId = getNextAvailableAppId
+
+      // exercise
+      val success = commandLineClient.killApp(freeAppId)
+      success shouldBe false
+    }
+  }
+
+  "use `gear replay` to replay the application from current min clock" should {
+    "todo: description" in {
+      // todo: test code
+    }
+  }
+
+  private def getRunningAppCount: Int = {
+    commandLineClient.listRunningApps().length
+  }
+
+  private def getNextAvailableAppId: Int = {
+    commandLineClient.listApps().length + 1
+  }
+
+  private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
+    val appId = commandLineClient.submitApp(jar)
+    appId should not equal -1
+    appId
+  }
+
+  private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: 
String): Unit = {
+    val actual = commandLineClient.queryApp(appId)
+    actual should include(s"application: $appId, ")
+    actual should include(s"name: $expectedName, ")
+    actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
+  }
+}

Reply via email to