Repository: incubator-gearpump
Updated Branches:
  refs/heads/master ae3204930 -> 97bf112ab


fix GEARPUMP-155, integration test failure

Changes include:

1. fix integration test
2. `DistShellAppMaster` and `DistServiceAppMaster` send 
`ActivateAppMaster(appId)`  message to master when all the executor systems 
have been launched
3. fix warning

Author: manuzhang <[email protected]>

Closes #36 from manuzhang/GERRPUMP-155.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/97bf112a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/97bf112a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/97bf112a

Branch: refs/heads/master
Commit: 97bf112abbfa5efb1511b682accd05ca6dc7487e
Parents: ae32049
Author: manuzhang <[email protected]>
Authored: Tue Jun 7 15:21:46 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Jun 7 15:21:46 2016 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/util/ActorUtil.scala    | 11 ++++++++++-
 .../distributedshell/DistShellAppMaster.scala         | 11 +++++++++--
 .../distributeservice/DistServiceAppMaster.scala      |  7 +++++++
 .../integrationtest/checklist/CommandLineSpec.scala   | 14 ++++++++------
 .../integrationtest/checklist/ExampleSpec.scala       |  2 +-
 .../gearpump/streaming/appmaster/AppMaster.scala      |  6 +++---
 .../streaming/appmaster/TaskManagerSpec.scala         |  2 +-
 7 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index b10163f..749cdf2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.util
 
+import org.apache.gearpump.cluster.AppMasterContext
 import org.apache.gearpump.cluster.worker.WorkerId
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -27,7 +28,7 @@ import akka.actor._
 import akka.pattern.ask
 import org.slf4j.Logger
 
-import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, 
GetAllWorkers}
 import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ResolveWorkerId}
 import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
 import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ResolveWorkerIdResult}
@@ -93,10 +94,18 @@ object ActorUtil {
         workerId => ResourceRequest(Resource(1), workerId, relaxation = 
Relaxation.SPECIFICWORKER)
       }.toArray
 
+      sender ! list
       master.tell(StartExecutorSystems(resources, executorJvmConfig), sender)
     }
   }
 
+  def tellMasterIfApplicationReady(workerNum: Option[Int], executorSystemNum: 
Int,
+      appContext: AppMasterContext): Unit = {
+    if (workerNum.contains(executorSystemNum)) {
+      appContext.masterProxy ! ActivateAppMaster(appContext.appId)
+    }
+  }
+
   def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: 
ExecutionContext)
     : Future[T] = {
     implicit val timeout = Constants.FUTURE_TIMEOUT

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
index cd0b18b..4559b9b 100644
--- 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
+++ 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.examples.distributedshell
 
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+
 import scala.concurrent.Future
 
 import akka.actor.{Deploy, Props}
@@ -27,7 +29,7 @@ import org.slf4j.Logger
 
 import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
 import 
org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig,
 ExecutorSystemStarted, StartExecutorSystemTimeout}
-import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
ApplicationMaster, ExecutorContext}
+import org.apache.gearpump.cluster._
 import org.apache.gearpump.examples.distributedshell.DistShellAppMaster._
 import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
 
@@ -39,6 +41,7 @@ class DistShellAppMaster(appContext: AppMasterContext, app: 
AppDescription)
   implicit val timeout = Constants.FUTURE_TIMEOUT
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   protected var currentExecutorId = 0
+  private var workerNum: Option[Int] = None
 
   override def preStart(): Unit = {
     LOG.info(s"Distributed Shell AppMaster started")
@@ -55,6 +58,10 @@ class DistShellAppMaster(appContext: AppMasterContext, app: 
AppDescription)
         .withDeploy(Deploy(scope = RemoteScope(address))), 
currentExecutorId.toString)
       executorSystem.bindLifeCycleWith(executor)
       currentExecutorId += 1
+      ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, 
appContext)
+    case WorkerList(workers) =>
+      workerNum = Some(workers.length)
+      ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, 
appContext)
     case StartExecutorSystemTimeout =>
       LOG.error(s"Failed to allocate resource in time")
       masterProxy ! ShutdownApplication(appId)
@@ -90,6 +97,6 @@ object DistShellAppMaster {
       this
     }
 
-    override def toString(): String = result.toString()
+    override def toString: String = result.toString()
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
 
b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
index ca0ab49..0bdbe77 100644
--- 
a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
+++ 
b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
@@ -18,6 +18,8 @@
 package org.apache.gearpump.experiments.distributeservice
 
 import java.io.File
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+
 import scala.concurrent.Future
 
 import akka.actor.{Deploy, Props}
@@ -39,6 +41,7 @@ class DistServiceAppMaster(appContext: AppMasterContext, app: 
AppDescription)
   implicit val timeout = Constants.FUTURE_TIMEOUT
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   private var currentExecutorId = 0
+  private var workerNum: Option[Int] = None
   private var fileServerPort = -1
 
   val rootDirectory = new File("/")
@@ -63,6 +66,10 @@ class DistServiceAppMaster(appContext: AppMasterContext, 
app: AppDescription)
         Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
       executorSystem.bindLifeCycleWith(executor)
       currentExecutorId += 1
+      ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, 
appContext)
+    case WorkerList(workers) =>
+      workerNum = Some(workers.length)
+      ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, 
appContext)
     case StartExecutorSystemTimeout =>
       LOG.error(s"Failed to allocate resource in time")
       masterProxy ! ShutdownApplication(appId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
index eabc684..f4e463e 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.gearpump.integrationtest.checklist
 
 import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.TestSpecBase
+import org.apache.gearpump.integrationtest.{Util, TestSpecBase}
 
 /**
  * The test spec checks the command-line usage
@@ -86,7 +86,7 @@ class CommandLineSpec extends TestSpecBase {
       success shouldBe false
     }
 
-    "the EmbededCluster can be used as embedded cluster in process" in {
+    "the EmbeddedCluster can be used as embedded cluster in process" in {
       // setup
       val args = "-debug true -sleep 10"
       val appId = expectSubmitAppSuccess(wordCountJar, args)
@@ -125,9 +125,11 @@ class CommandLineSpec extends TestSpecBase {
   }
 
   private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: 
String): Unit = {
-    val actual = commandLineClient.queryApp(appId)
-    actual should include(s"application: $appId, ")
-    actual should include(s"name: $expectedName, ")
-    actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
+    Util.retryUntil(() => {
+      val actual = commandLineClient.queryApp(appId)
+      actual.contains(s"application: $appId, ") &&
+        actual.contains(s"name: $expectedName, ") &&
+        actual.contains(s"status: ${MasterToAppMaster.AppMasterActive}")
+    }, "application is running")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
index 27e4665..5554a26 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -45,7 +45,7 @@ class ExampleSpec extends TestSpecBase {
         "-command", "hostname"
       )
 
-      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
+      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName)
 
       def verify(): Boolean = {
         val workerNum = cluster.getWorkerHosts.length

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 24c6da8..d0cfc80 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.AppMaster._
 import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
LatestDAG, ReplaceProcessor}
 import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, 
GetExecutorInfo}
-import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, 
FailedToRecover, GetTaskList, TaskList}
+import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, 
GetTaskList, TaskList, FailedToRecover}
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, 
ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
 import org.apache.gearpump.streaming.task._
@@ -286,8 +286,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
   def ready: Receive = {
     case ApplicationReady =>
       masterProxy ! ActivateAppMaster(appId)
-    case AppMasterActivated(appId) =>
-      LOG.info(s"AppMaster for app$appId is activated")
+    case AppMasterActivated(id) =>
+      LOG.info(s"AppMaster for app$id is activated")
   }
 
   /** Error handling */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 89f9c1d..dac5a5c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -116,7 +116,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
     executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
   }
 
-  import org.apache.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
+  import TaskManager.TaskChangeRegistry
   "TaskChangeRegistry" should "track all modified task registration" in {
     val tasks = List(TaskId(0, 0), TaskId(0, 1))
     val registry = new TaskChangeRegistry(tasks)

Reply via email to