Repository: flink Updated Branches: refs/heads/master 52d9806ba -> 46e052619
[FLINK-1495][yarn] Make Akka timeout configurable in YARN client. This closes #377 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46e05261 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46e05261 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46e05261 Branch: refs/heads/master Commit: 46e05261968d073dd7faf1c0089aa39e8277959d Parents: 52d9806 Author: Robert Metzger <rmetz...@apache.org> Authored: Mon Feb 9 14:45:56 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Feb 9 19:06:20 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/yarn/FlinkYarnClient.java | 2 +- .../org/apache/flink/yarn/FlinkYarnCluster.java | 40 +++++++++----------- 2 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 23ef523..9536e22 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -586,7 +586,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { Thread.sleep(1000); } // the Flink cluster is deployed in YARN. Represent cluster - return new FlinkYarnCluster(yarnClient, appId, conf, sessionFilesDir); + return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 3f2e72e..1794d36 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -24,7 +24,6 @@ import static akka.pattern.Patterns.ask; import akka.actor.Props; import akka.util.Timeout; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils$; import org.apache.flink.runtime.net.NetUtils; @@ -46,9 +45,7 @@ import scala.None$; import scala.Some; import scala.Tuple2; import scala.concurrent.Await; -import scala.concurrent.Awaitable; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; @@ -56,7 +53,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,21 +65,24 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { private Thread actorRunner; private Thread clientShutdownHook = new ClientShutdownHook(); private PollingThread pollingRunner; - private Configuration hadoopConfig; + private final Configuration hadoopConfig; // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. - private Path sessionFilesDir; - private InetSocketAddress jobManagerAddress; + private final Path sessionFilesDir; + private final InetSocketAddress jobManagerAddress; //---------- Class internal fields ------------------- private ActorSystem actorSystem; private ActorRef applicationClient; private ApplicationReport intialAppReport; - private static FiniteDuration akkaDuration = Duration.apply(5, TimeUnit.SECONDS); - private static Timeout akkaTimeout = Timeout.durationToTimeout(akkaDuration); - - public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, - Configuration hadoopConfig, Path sessionFilesDir) throws IOException, YarnException { + private final FiniteDuration akkaDuration; + private final Timeout akkaTimeout; + + public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, Configuration hadoopConfig, + org.apache.flink.configuration.Configuration flinkConfig, + Path sessionFilesDir) throws IOException, YarnException { + this.akkaDuration = AkkaUtils.getTimeout(flinkConfig); + this.akkaTimeout = Timeout.durationToTimeout(akkaDuration); this.yarnClient = yarnClient; this.hadoopConfig = hadoopConfig; this.sessionFilesDir = sessionFilesDir; @@ -97,7 +96,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { // start actor system LOG.info("Start actor system."); InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(GlobalConfiguration.getConfiguration(), + actorSystem = AkkaUtils.createActorSystem(flinkConfig, new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0))); // start application client @@ -166,7 +165,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { throw new RuntimeException("The FlinkYarnCluster has alread been stopped"); } Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout); - Object clusterStatus = awaitUtil(clusterStatusOption, "Unable to get Cluster status from Application Client"); + Object clusterStatus; + try { + clusterStatus = Await.result(clusterStatusOption, akkaDuration); + } catch (Exception e) { + throw new RuntimeException("Unable to get Cluster status from Application Client", e); + } if(clusterStatus instanceof None$) { return null; } else if(clusterStatus instanceof Some) { @@ -234,14 +238,6 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { return ret; } - private static <T> T awaitUtil(Awaitable<T> awaitable, String message) { - try { - return Await.result(awaitable, akkaDuration); - } catch (Exception e) { - throw new RuntimeException(message, e); - } - } - // -------------------------- Shutdown handling ------------------------ private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);