Repository: flink
Updated Branches:
  refs/heads/release-1.3 6e40223b5 -> f783e529c


[FLINK-6629] Use HAServices to find connecting address for ClusterClient's 
ActorSystem

The ClusterClient starts its ActorSystem lazily. In order to find out the 
address
to which to bind, the ClusterClient tries to connect to the JobManager. In order
to find out the JobManager's address it is important to use the
HighAvailabilityServices instead of retrieving the address information from the
configuration, because otherwise it conflicts with HA mode.

This closes #3949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f783e529
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f783e529
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f783e529

Branch: refs/heads/release-1.3
Commit: f783e529c558ac3df68b4e69fb931f2d55b55db7
Parents: 77c02fe
Author: Till Rohrmann <[email protected]>
Authored: Fri May 19 14:31:19 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon May 22 11:21:20 2017 +0200

----------------------------------------------------------------------
 flink-clients/pom.xml                           |   8 ++
 .../flink/client/program/ClusterClient.java     | 110 +++++++++++++------
 .../client/program/ClientConnectionTest.java    |  52 +++++++++
 .../runtime/util/LeaderRetrievalUtils.java      |   4 +-
 .../RemoteEnvironmentITCase.java                |   8 +-
 .../apache/flink/yarn/YarnClusterClient.java    |  28 +++--
 6 files changed, 165 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 0e0c146..205c3d8 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -76,6 +76,14 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
                
                <dependency>
                        <groupId>com.data-artisans</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e09a0b6..e7314eb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
@@ -62,13 +61,12 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Some;
+import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -151,7 +149,11 @@ public abstract class ClusterClient {
                this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
                this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
-               this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, 
LOG);
+               this.actorSystemLoader = new LazyActorSystemLoader(
+                       highAvailabilityServices,
+                       Time.milliseconds(lookupTimeout.toMillis()),
+                       flinkConfig,
+                       LOG);
 
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
        }
@@ -164,13 +166,23 @@ public abstract class ClusterClient {
 
                private final Logger LOG;
 
-               private final Configuration flinkConfig;
+               private final HighAvailabilityServices highAvailabilityServices;
+
+               private final Time timeout;
+
+               private final Configuration configuration;
 
                private ActorSystem actorSystem;
 
-               private LazyActorSystemLoader(Configuration flinkConfig, Logger 
LOG) {
-                       this.flinkConfig = flinkConfig;
-                       this.LOG = LOG;
+               private LazyActorSystemLoader(
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               Time timeout,
+                               Configuration configuration,
+                               Logger LOG) {
+                       this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+                       this.timeout = Preconditions.checkNotNull(timeout);
+                       this.configuration = 
Preconditions.checkNotNull(configuration);
+                       this.LOG = Preconditions.checkNotNull(LOG);
                }
 
                /**
@@ -192,30 +204,31 @@ public abstract class ClusterClient {
                /**
                 * Creates a new ActorSystem or returns an existing one.
                 * @return ActorSystem
+                * @throws Exception if the ActorSystem could not be created
                 */
-               public ActorSystem get() {
+               public ActorSystem get() throws FlinkException {
 
                        if (!isLoaded()) {
                                // start actor system
                                LOG.info("Starting client actor system.");
 
-                               String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-                               int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-                               if (hostName == null || port == -1) {
-                                       throw new RuntimeException("The initial 
JobManager address has not been set correctly.");
+                               final InetAddress ownHostname;
+                               try {
+                                       ownHostname = 
LeaderRetrievalUtils.findConnectingAddress(
+                                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                                               timeout);
+                               } catch (LeaderRetrievalException lre) {
+                                       throw new FlinkException("Could not 
find out our own hostname by connecting to the " +
+                                               "leading JobManager. Please 
make sure that the Flink cluster has been started.", lre);
                                }
-                               InetSocketAddress initialJobManagerAddress = 
new InetSocketAddress(hostName, port);
 
-                               // find name of own public interface, able to 
connect to the JM
-                               // try to find address for 2 seconds. log after 
400 ms.
-                               InetAddress ownHostname;
                                try {
-                                       ownHostname = 
ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
-                               } catch (IOException e) {
-                                       throw new RuntimeException("Failed to 
resolve JobManager address at " + initialJobManagerAddress, e);
+                                       actorSystem = 
AkkaUtils.createActorSystem(
+                                               configuration,
+                                               Option.apply(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
+                               } catch (Exception e) {
+                                       throw new FlinkException("Could not 
start the ActorSystem lazily.", e);
                                }
-                               actorSystem = 
AkkaUtils.createActorSystem(flinkConfig,
-                                       new Some<>(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
                        }
 
                        return actorSystem;
@@ -440,10 +453,19 @@ public abstract class ClusterClient {
 
                waitForClusterToBeReady();
 
+               final ActorSystem actorSystem;
+
+               try {
+                       actorSystem = actorSystemLoader.get();
+               } catch (FlinkException fe) {
+                       throw new ProgramInvocationException("Could not start 
the ActorSystem needed to talk to the " +
+                               "JobManager.", fe);
+               }
+
                try {
                        logAndSysout("Submitting job with JobID: " + 
jobGraph.getJobID() + ". Waiting for job completion.");
                        this.lastJobExecutionResult = 
JobClient.submitJobAndWait(
-                               actorSystemLoader.get(),
+                               actorSystem,
                                flinkConfig,
                                highAvailabilityServices,
                                jobGraph,
@@ -451,7 +473,7 @@ public abstract class ClusterClient {
                                printStatusDuringExecution,
                                classLoader);
 
-                       return this.lastJobExecutionResult;
+                       return lastJobExecutionResult;
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
                }
@@ -491,6 +513,17 @@ public abstract class ClusterClient {
         * @throws JobExecutionException if an error occurs during monitoring 
the job execution
         */
        public JobExecutionResult retrieveJob(JobID jobID) throws 
JobExecutionException {
+               final ActorSystem actorSystem;
+
+               try {
+                       actorSystem = actorSystemLoader.get();
+               } catch (FlinkException fe) {
+                       throw new JobExecutionException(
+                               jobID,
+                               "Could not start the ActorSystem needed to talk 
to the JobManager.",
+                               fe);
+               }
+
                ActorGateway jobManagerGateway;
                try {
                        jobManagerGateway = getJobManagerGateway();
@@ -502,7 +535,7 @@ public abstract class ClusterClient {
                        jobID,
                        jobManagerGateway,
                        flinkConfig,
-                       actorSystemLoader.get(),
+                       actorSystem,
                        highAvailabilityServices,
                        timeout,
                        printStatusDuringExecution);
@@ -518,6 +551,17 @@ public abstract class ClusterClient {
         * @throws JobExecutionException if an error occurs during monitoring 
the job execution
         */
        public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+               final ActorSystem actorSystem;
+
+               try {
+                       actorSystem = actorSystemLoader.get();
+               } catch (FlinkException fe) {
+                       throw new JobExecutionException(
+                               jobID,
+                               "Could not start the ActorSystem needed to talk 
to the JobManager.",
+                               fe);
+               }
+
                ActorGateway jobManagerGateway;
                try {
                        jobManagerGateway = getJobManagerGateway();
@@ -526,13 +570,13 @@ public abstract class ClusterClient {
                }
 
                return JobClient.attachToRunningJob(
-                               jobID,
-                               jobManagerGateway,
-                               flinkConfig,
-                               actorSystemLoader.get(),
-                               highAvailabilityServices,
-                               timeout,
-                               printStatusDuringExecution);
+                       jobID,
+                       jobManagerGateway,
+                       flinkConfig,
+                       actorSystem,
+                       highAvailabilityServices,
+                       timeout,
+                       printStatusDuringExecution);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 3bfaa95..246a75c 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,16 +18,27 @@
 
 package org.apache.flink.client.program;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorTest;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 import static org.junit.Assert.*;
 
@@ -98,4 +109,45 @@ public class ClientConnectionTest extends TestLogger {
                        assertTrue(CommonTestUtils.containsCause(e, 
LeaderRetrievalException.class));
                }
        }
+
+       /**
+        * FLINK-6629
+        *
+        * Tests that the {@link HighAvailabilityServices} are respected when 
initializing the ClusterClient's
+        * {@link ActorSystem} and retrieving the leading JobManager.
+        */
+       @Test
+       public void testJobManagerRetrievalWithHAServices() throws Exception {
+               final Configuration configuration = new Configuration();
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+               ActorRef actorRef = null;
+               final UUID leaderId = UUID.randomUUID();
+
+               try {
+                       actorRef = actorSystem.actorOf(
+                               Props.create(
+                                       JobClientActorTest.PlainActor.class,
+                                       leaderId));
+
+                       final String expectedAddress = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+                       final TestingLeaderRetrievalService 
testingLeaderRetrievalService = new 
TestingLeaderRetrievalService(expectedAddress, leaderId);
+
+                       
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
 testingLeaderRetrievalService);
+
+                       ClusterClient client = new 
StandaloneClusterClient(configuration, highAvailabilityServices);
+
+                       ActorGateway gateway = client.getJobManagerGateway();
+
+                       assertEquals(expectedAddress, gateway.path());
+                       assertEquals(leaderId, gateway.leaderSessionID());
+               } finally {
+                       if (actorRef != null) {
+                               TestingUtils.stopActorGracefully(actorRef);
+                       }
+
+                       actorSystem.shutdown();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 009bec6..6b861a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -137,8 +137,8 @@ public class LeaderRetrievalUtils {
        }
 
        public static InetAddress findConnectingAddress(
-               LeaderRetrievalService leaderRetrievalService,
-               Time timeout) throws LeaderRetrievalException {
+                       LeaderRetrievalService leaderRetrievalService,
+                       Time timeout) throws LeaderRetrievalException {
                return findConnectingAddress(leaderRetrievalService, new 
FiniteDuration(timeout.getSize(), timeout.getUnit()));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 0091571..7c6f73a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,13 @@ import 
org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -73,7 +75,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
        /**
         * Ensure that that Akka configuration parameters can be set.
         */
-       @Test(expected=IllegalArgumentException.class)
+       @Test(expected=FlinkException.class)
        public void testInvalidAkkaConfiguration() throws Throwable {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
@@ -86,11 +88,11 @@ public class RemoteEnvironmentITCase extends TestLogger {
                env.getConfig().disableSysoutLogging();
 
                DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
-               result.output(new LocalCollectionOutputFormat<String>(new 
ArrayList<String>()));
+               result.output(new LocalCollectionOutputFormat<>(new 
ArrayList<String>()));
                try {
                        env.execute();
                        Assert.fail("Program should not run successfully, cause 
of invalid akka settings.");
-               } catch (IOException ex) {
+               } catch (ProgramInvocationException ex) {
                        throw ex.getCause();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e70af09..8f47b18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.ActorRef;
 
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -34,6 +35,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import 
org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
@@ -569,17 +571,29 @@ public class YarnClusterClient extends ClusterClient {
                 * Creates a new ApplicationClient actor or returns an existing 
one. May start an ActorSystem.
                 * @return ActorSystem
                 */
-               public ActorRef get() {
+               public ActorRef get() throws FlinkException {
                        if (applicationClient == null) {
                                // start application client
                                LOG.info("Start application client.");
 
-                               applicationClient = 
actorSystemLoader.get().actorOf(
-                                       Props.create(
-                                               ApplicationClient.class,
-                                               flinkConfig,
-                                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
-                                       "applicationClient");
+                               final ActorSystem actorSystem;
+
+                               try {
+                                       actorSystem = actorSystemLoader.get();
+                               } catch (FlinkException fle) {
+                                       throw new FlinkException("Could not 
start the ClusterClient's ActorSystem.", fle);
+                               }
+
+                               try {
+                                       applicationClient = actorSystem.actorOf(
+                                               Props.create(
+                                                       ApplicationClient.class,
+                                                       flinkConfig,
+                                                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
+                                               "applicationClient");
+                               } catch (Exception e) {
+                                       throw new FlinkException("Could not 
start the ApplicationClient.", e);
+                               }
                        }
 
                        return applicationClient;

Reply via email to