fix #1910 enlarge timeout in MasterClient

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f7347a81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f7347a81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f7347a81

Branch: refs/heads/master
Commit: f7347a812a78459ef1eedbf98d073d60edc19347
Parents: e8d224e
Author: Weihua Jiang <[email protected]>
Authored: Tue Feb 2 15:52:10 2016 +0800
Committer: Weihua Jiang <[email protected]>
Committed: Tue Feb 2 15:52:10 2016 +0800

----------------------------------------------------------------------
 core/src/main/resources/geardefault.conf              |  3 +++
 .../io/gearpump/cluster/client/ClientContext.scala    | 14 ++++++++++----
 .../io/gearpump/cluster/client/MasterClient.scala     |  5 +++--
 core/src/main/scala/io/gearpump/util/Constants.scala  |  1 +
 4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf 
b/core/src/main/resources/geardefault.conf
index e5dc554..d775696 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -206,6 +206,9 @@ gearpump {
   ### the appmaster will shutdown itself.
   resource-allocation-timeout-seconds = 10
 
+  ### client's timeout (in second) to connect to master and wait for the 
response
+  masterclient.timeout = 90
+
   ### Define where the submitted jar file will be stored at
 
   ### This path follows the hadoop path schema

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala 
b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
index 5b6d0fc..536d8c1 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
@@ -36,6 +36,7 @@ import scala.collection.JavaConversions._
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 import scala.concurrent.Future
+import scala.util.Try
 
 
 /**
@@ -92,7 +93,7 @@ class ClientContext(config: Config, sys: ActorSystem, 
_master: ActorRef) {
   }
 
   private def submit(app : AppDescription, jarPath: String) : Int = {
-    val client = new MasterClient(master)
+    val client = getMasterClient
     val appName = checkAndAddNamePrefix(app.name, 
System.getProperty(GEARPUMP_APP_NAME_PREFIX))
     val updatedApp = AppDescription(appName, app.appMaster, app.userConfig, 
app.clusterConfig)
     if (jarPath == null) {
@@ -115,17 +116,17 @@ class ClientContext(config: Config, sys: ActorSystem, 
_master: ActorRef) {
   }
 
   def listApps: AppMastersData = {
-    val client = new MasterClient(master)
+    val client = getMasterClient
     client.listApplications
   }
 
   def shutdown(appId : Int) : Unit = {
-    val client = new MasterClient(master)
+    val client = getMasterClient
     client.shutdownApplication(appId)
   }
 
   def resolveAppID(appId: Int) : ActorRef = {
-    val client = new MasterClient(master)
+    val client = getMasterClient
     client.resolveAppId(appId)
   }
 
@@ -156,6 +157,11 @@ class ClientContext(config: Config, sys: ActorSystem, 
_master: ActorRef) {
     }
     fullName
   }
+
+  private def getMasterClient: MasterClient = {
+    val timeout = 
Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
+    new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
+  }
 }
 
 object ClientContext {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala 
b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
index 4fa586b..5800e8d 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.client
 
 import akka.actor.ActorRef
 import akka.pattern.ask
+import akka.util.Timeout
 import io.gearpump.cluster.ClientToMaster._
 import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, 
AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
 import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, 
ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
@@ -34,8 +35,8 @@ import scala.util.{Failure, Success}
  * Client to Master node.
  * Stateless, thread safe
  */
-class MasterClient(master : ActorRef) {
-  implicit val timeout = Constants.FUTURE_TIMEOUT
+class MasterClient(master : ActorRef, timeout: Timeout) {
+  implicit val masterClientTimeout = timeout
 
   def submitApplication(app : AppDescription, appJar: Option[AppJar]) : Int = {
     val result = Await.result( (master ? SubmitApplication(app, 
appJar)).asInstanceOf[Future[SubmitApplicationResult]], Duration.Inf)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f7347a81/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala 
b/core/src/main/scala/io/gearpump/util/Constants.scala
index aa43b00..5e340e3 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -43,6 +43,7 @@ object Constants {
   val GEARPUMP_SERIALIZERS = "gearpump.serializers"
   val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher"
   val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters"
+  val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout"
   val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = 
"gearpump.worker.executor-share-same-jvm-as-worker"
 
   val GEARPUMP_HOME = "gearpump.home"

Reply via email to