[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

This closes #5838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46

Branch: refs/heads/release-1.5
Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817
Parents: 50504ce
Author: zentol <ches...@apache.org>
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  3 ++
 .../client/program/rest/RestClusterClient.java  |  3 +-
 .../program/rest/RestClusterClientTest.java     | 35 ++++++++++++++++++++
 3 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
        public static void setJobManagerAddressInConfig(Configuration config, 
InetSocketAddress address) {
                config.setString(JobManagerOptions.ADDRESS, 
address.getHostString());
                config.setInteger(JobManagerOptions.PORT, address.getPort());
+               config.setString(RestOptions.REST_ADDRESS, 
address.getHostString());
+               config.setInteger(RestOptions.REST_PORT, address.getPort());
        }
 
        public static List<CustomCommandLine<?>> 
loadCustomCommandLines(Configuration configuration, String 
configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
                                .orElse(false);
        }
 
-       private CompletableFuture<URL> getWebMonitorBaseUrl() {
+       @VisibleForTesting
+       CompletableFuture<URL> getWebMonitorBaseUrl() {
                return FutureUtils.orTimeout(
                                webMonitorLeaderRetriever.getLeaderFuture(),
                                
restClusterClientConfiguration.getAwaitLeaderTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.apache.commons.cli.CommandLine;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -111,6 +114,7 @@ import org.mockito.MockitoAnnotations;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -677,6 +681,37 @@ public class RestClusterClientTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that command line options override the configuration settings.
+        */
+       @Test
+       public void testRESTManualConfigurationOverride() throws Exception {
+               final String configuredHostname = "localhost";
+               final int configuredPort = 1234;
+               final Configuration configuration = new Configuration();
+
+               configuration.setString(JobManagerOptions.ADDRESS, 
configuredHostname);
+               configuration.setInteger(JobManagerOptions.PORT, 
configuredPort);
+               configuration.setString(RestOptions.REST_ADDRESS, 
configuredHostname);
+               configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+
+               final DefaultCLI defaultCLI = new DefaultCLI(configuration);
+
+               final String manualHostname = "123.123.123.123";
+               final int manualPort = 4321;
+               final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+               CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
+
+               final StandaloneClusterDescriptor clusterDescriptor = 
defaultCLI.createClusterDescriptor(commandLine);
+
+               final RestClusterClient<?> clusterClient = 
clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+
+               URL webMonitorBaseUrl = 
clusterClient.getWebMonitorBaseUrl().get();
+               assertThat(webMonitorBaseUrl.getHost(), 
equalTo(manualHostname));
+               assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort));
+       }
+
        private class TestAccumulatorHandler extends 
TestHandler<EmptyRequestBody, JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
 
                public TestAccumulatorHandler() {

Reply via email to