Repository: incubator-gearpump Updated Branches: refs/heads/master ae3204930 -> 97bf112ab
fix GEARPUMP-155, integration test failure Changes include: 1. fix integration test 2. `DistShellAppMaster` and `DistServiceAppMaster` send `ActivateAppMaster(appId)` message to master when all the executor systems have been launched 3. fix warning Author: manuzhang <[email protected]> Closes #36 from manuzhang/GERRPUMP-155. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/97bf112a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/97bf112a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/97bf112a Branch: refs/heads/master Commit: 97bf112abbfa5efb1511b682accd05ca6dc7487e Parents: ae32049 Author: manuzhang <[email protected]> Authored: Tue Jun 7 15:21:46 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Jun 7 15:21:46 2016 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/util/ActorUtil.scala | 11 ++++++++++- .../distributedshell/DistShellAppMaster.scala | 11 +++++++++-- .../distributeservice/DistServiceAppMaster.scala | 7 +++++++ .../integrationtest/checklist/CommandLineSpec.scala | 14 ++++++++------ .../integrationtest/checklist/ExampleSpec.scala | 2 +- .../gearpump/streaming/appmaster/AppMaster.scala | 6 +++--- .../streaming/appmaster/TaskManagerSpec.scala | 2 +- 7 files changed, 39 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala index b10163f..749cdf2 100644 --- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.util +import org.apache.gearpump.cluster.AppMasterContext import org.apache.gearpump.cluster.worker.WorkerId import scala.concurrent.{ExecutionContext, Future} @@ -27,7 +28,7 @@ import akka.actor._ import akka.pattern.ask import org.slf4j.Logger -import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers +import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult} @@ -93,10 +94,18 @@ object ActorUtil { workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER) }.toArray + sender ! list master.tell(StartExecutorSystems(resources, executorJvmConfig), sender) } } + def tellMasterIfApplicationReady(workerNum: Option[Int], executorSystemNum: Int, + appContext: AppMasterContext): Unit = { + if (workerNum.contains(executorSystemNum)) { + appContext.masterProxy ! ActivateAppMaster(appContext.appId) + } + } + def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext) : Future[T] = { implicit val timeout = Constants.FUTURE_TIMEOUT http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala index cd0b18b..4559b9b 100644 --- a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.examples.distributedshell +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList + import scala.concurrent.Future import akka.actor.{Deploy, Props} @@ -27,7 +29,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout} -import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext} +import org.apache.gearpump.cluster._ import org.apache.gearpump.examples.distributedshell.DistShellAppMaster._ import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util} @@ -39,6 +41,7 @@ class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription) implicit val timeout = Constants.FUTURE_TIMEOUT private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) protected var currentExecutorId = 0 + private var workerNum: Option[Int] = None override def preStart(): Unit = { LOG.info(s"Distributed Shell AppMaster started") @@ -55,6 +58,10 @@ class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription) .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString) executorSystem.bindLifeCycleWith(executor) currentExecutorId += 1 + ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, appContext) + case WorkerList(workers) => + workerNum = Some(workers.length) + ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, appContext) case StartExecutorSystemTimeout => LOG.error(s"Failed to allocate resource in time") masterProxy ! ShutdownApplication(appId) @@ -90,6 +97,6 @@ object DistShellAppMaster { this } - override def toString(): String = result.toString() + override def toString: String = result.toString() } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala index ca0ab49..0bdbe77 100644 --- a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.experiments.distributeservice import java.io.File +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList + import scala.concurrent.Future import akka.actor.{Deploy, Props} @@ -39,6 +41,7 @@ class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription) implicit val timeout = Constants.FUTURE_TIMEOUT private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) private var currentExecutorId = 0 + private var workerNum: Option[Int] = None private var fileServerPort = -1 val rootDirectory = new File("/") @@ -63,6 +66,10 @@ class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription) Deploy(scope = RemoteScope(address))), currentExecutorId.toString) executorSystem.bindLifeCycleWith(executor) currentExecutorId += 1 + ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, appContext) + case WorkerList(workers) => + workerNum = Some(workers.length) + ActorUtil.tellMasterIfApplicationReady(workerNum, currentExecutorId, appContext) case StartExecutorSystemTimeout => LOG.error(s"Failed to allocate resource in time") masterProxy ! ShutdownApplication(appId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala index eabc684..f4e463e 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.integrationtest.checklist import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.integrationtest.TestSpecBase +import org.apache.gearpump.integrationtest.{Util, TestSpecBase} /** * The test spec checks the command-line usage @@ -86,7 +86,7 @@ class CommandLineSpec extends TestSpecBase { success shouldBe false } - "the EmbededCluster can be used as embedded cluster in process" in { + "the EmbeddedCluster can be used as embedded cluster in process" in { // setup val args = "-debug true -sleep 10" val appId = expectSubmitAppSuccess(wordCountJar, args) @@ -125,9 +125,11 @@ class CommandLineSpec extends TestSpecBase { } private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = { - val actual = commandLineClient.queryApp(appId) - actual should include(s"application: $appId, ") - actual should include(s"name: $expectedName, ") - actual should include(s"status: ${MasterToAppMaster.AppMasterActive}") + Util.retryUntil(() => { + val actual = commandLineClient.queryApp(appId) + actual.contains(s"application: $appId, ") && + actual.contains(s"name: $expectedName, ") && + actual.contains(s"status: ${MasterToAppMaster.AppMasterActive}") + }, "application is running") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala index 27e4665..5554a26 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -45,7 +45,7 @@ class ExampleSpec extends TestSpecBase { "-command", "hostname" ) - val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_)) + val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName) def verify(): Boolean = { val workerNum = cluster.getWorkerHosts.length http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 24c6da8..d0cfc80 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -36,7 +36,7 @@ import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.AppMaster._ import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo} -import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, FailedToRecover, GetTaskList, TaskList} +import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, GetTaskList, TaskList, FailedToRecover} import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster import org.apache.gearpump.streaming.task._ @@ -286,8 +286,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli def ready: Receive = { case ApplicationReady => masterProxy ! ActivateAppMaster(appId) - case AppMasterActivated(appId) => - LOG.info(s"AppMaster for app$appId is activated") + case AppMasterActivated(id) => + LOG.info(s"AppMaster for app$id is activated") } /** Error handling */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/97bf112a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index 89f9c1d..dac5a5c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -116,7 +116,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) } - import org.apache.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry + import TaskManager.TaskChangeRegistry "TaskChangeRegistry" should "track all modified task registration" in { val tasks = List(TaskId(0, 0), TaskId(0, 1)) val registry = new TaskChangeRegistry(tasks)
