Repository: flink
Updated Branches:
  refs/heads/master 52d9806ba -> 46e052619


[FLINK-1495][yarn] Make Akka timeout configurable in YARN client.

This closes #377


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46e05261
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46e05261
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46e05261

Branch: refs/heads/master
Commit: 46e05261968d073dd7faf1c0089aa39e8277959d
Parents: 52d9806
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Feb 9 14:45:56 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Feb 9 19:06:20 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 40 +++++++++-----------
 2 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 23ef523..9536e22 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -586,7 +586,7 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                        Thread.sleep(1000);
                }
                // the Flink cluster is deployed in YARN. Represent cluster
-               return new FlinkYarnCluster(yarnClient, appId, conf, 
sessionFilesDir);
+               return new FlinkYarnCluster(yarnClient, appId, conf, 
flinkConfiguration, sessionFilesDir);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 3f2e72e..1794d36 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -24,7 +24,6 @@ import static akka.pattern.Patterns.ask;
 
 import akka.actor.Props;
 import akka.util.Timeout;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.AkkaUtils$;
 import org.apache.flink.runtime.net.NetUtils;
@@ -46,9 +45,7 @@ import scala.None$;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
-import scala.concurrent.Awaitable;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -56,7 +53,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
@@ -69,21 +65,24 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
        private Thread actorRunner;
        private Thread clientShutdownHook = new ClientShutdownHook();
        private PollingThread pollingRunner;
-       private Configuration hadoopConfig;
+       private final Configuration hadoopConfig;
        // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
-       private Path sessionFilesDir;
-       private InetSocketAddress jobManagerAddress;
+       private final Path sessionFilesDir;
+       private final InetSocketAddress jobManagerAddress;
 
        //---------- Class internal fields -------------------
 
        private ActorSystem actorSystem;
        private ActorRef applicationClient;
        private ApplicationReport intialAppReport;
-       private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
-       private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
-
-       public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
-                                                       Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+       private final FiniteDuration akkaDuration;
+       private final Timeout akkaTimeout;
+
+       public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId, Configuration hadoopConfig,
+                                                       
org.apache.flink.configuration.Configuration flinkConfig,
+                                                       Path sessionFilesDir) 
throws IOException, YarnException {
+               this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
+               this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
                this.yarnClient = yarnClient;
                this.hadoopConfig = hadoopConfig;
                this.sessionFilesDir = sessionFilesDir;
@@ -97,7 +96,7 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                // start actor system
                LOG.info("Start actor system.");
                InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
-               actorSystem = 
AkkaUtils.createActorSystem(GlobalConfiguration.getConfiguration(),
+               actorSystem = AkkaUtils.createActorSystem(flinkConfig,
                                new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
 
                // start application client
@@ -166,7 +165,12 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                        throw new RuntimeException("The FlinkYarnCluster has 
alread been stopped");
                }
                Future<Object> clusterStatusOption = ask(applicationClient, 
Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout);
-               Object clusterStatus = awaitUtil(clusterStatusOption, "Unable 
to get Cluster status from Application Client");
+               Object clusterStatus;
+               try {
+                       clusterStatus = Await.result(clusterStatusOption, 
akkaDuration);
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to get Cluster 
status from Application Client", e);
+               }
                if(clusterStatus instanceof None$) {
                        return null;
                } else if(clusterStatus instanceof Some) {
@@ -234,14 +238,6 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                return ret;
        }
 
-       private static <T> T awaitUtil(Awaitable<T> awaitable, String message) {
-               try {
-                       return Await.result(awaitable, akkaDuration);
-               } catch (Exception e) {
-                       throw new RuntimeException(message, e);
-               }
-       }
-
        // -------------------------- Shutdown handling ------------------------
 
        private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);

Reply via email to