fix #1910 enlarge timeout in MasterClient
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f7347a81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f7347a81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f7347a81 Branch: refs/heads/master Commit: f7347a812a78459ef1eedbf98d073d60edc19347 Parents: e8d224e Author: Weihua Jiang <[email protected]> Authored: Tue Feb 2 15:52:10 2016 +0800 Committer: Weihua Jiang <[email protected]> Committed: Tue Feb 2 15:52:10 2016 +0800 ---------------------------------------------------------------------- core/src/main/resources/geardefault.conf | 3 +++ .../io/gearpump/cluster/client/ClientContext.scala | 14 ++++++++++---- .../io/gearpump/cluster/client/MasterClient.scala | 5 +++-- core/src/main/scala/io/gearpump/util/Constants.scala | 1 + 4 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index e5dc554..d775696 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -206,6 +206,9 @@ gearpump { ### the appmaster will shutdown itself. resource-allocation-timeout-seconds = 10 + ### client's timeout (in second) to connect to master and wait for the response + masterclient.timeout = 90 + ### Define where the submitted jar file will be stored at ### This path follows the hadoop path schema http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala index 5b6d0fc..536d8c1 100644 --- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala @@ -36,6 +36,7 @@ import scala.collection.JavaConversions._ import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.concurrent.Future +import scala.util.Try /** @@ -92,7 +93,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { } private def submit(app : AppDescription, jarPath: String) : Int = { - val client = new MasterClient(master) + val client = getMasterClient val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) val updatedApp = AppDescription(appName, app.appMaster, app.userConfig, app.clusterConfig) if (jarPath == null) { @@ -115,17 +116,17 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { } def listApps: AppMastersData = { - val client = new MasterClient(master) + val client = getMasterClient client.listApplications } def shutdown(appId : Int) : Unit = { - val client = new MasterClient(master) + val client = getMasterClient client.shutdownApplication(appId) } def resolveAppID(appId: Int) : ActorRef = { - val client = new MasterClient(master) + val client = getMasterClient client.resolveAppId(appId) } @@ -156,6 +157,11 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { } fullName } + + private def getMasterClient: MasterClient = { + val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) + new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS)) + } } object ClientContext { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala index 4fa586b..5800e8d 100644 --- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala +++ b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala @@ -20,6 +20,7 @@ package io.gearpump.cluster.client import akka.actor.ActorRef import akka.pattern.ask +import akka.util.Timeout import io.gearpump.cluster.ClientToMaster._ import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge} import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} @@ -34,8 +35,8 @@ import scala.util.{Failure, Success} * Client to Master node. * Stateless, thread safe */ -class MasterClient(master : ActorRef) { - implicit val timeout = Constants.FUTURE_TIMEOUT +class MasterClient(master : ActorRef, timeout: Timeout) { + implicit val masterClientTimeout = timeout def submitApplication(app : AppDescription, appJar: Option[AppJar]) : Int = { val result = Await.result( (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], Duration.Inf) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala index aa43b00..5e340e3 100644 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ b/core/src/main/scala/io/gearpump/util/Constants.scala @@ -43,6 +43,7 @@ object Constants { val GEARPUMP_SERIALIZERS = "gearpump.serializers" val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher" val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters" + val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout" val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = "gearpump.worker.executor-share-same-jvm-as-worker" val GEARPUMP_HOME = "gearpump.home"
