[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client This closes #5838.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46 Branch: refs/heads/release-1.5 Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817 Parents: 50504ce Author: zentol <ches...@apache.org> Authored: Wed Apr 11 12:48:51 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Apr 16 21:18:32 2018 +0200 ---------------------------------------------------------------------- .../apache/flink/client/cli/CliFrontend.java | 3 ++ .../client/program/rest/RestClusterClient.java | 3 +- .../program/rest/RestClusterClientTest.java | 35 ++++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index ce6556b..65f470b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -1141,6 +1142,8 @@ public class CliFrontend { public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { config.setString(JobManagerOptions.ADDRESS, address.getHostString()); config.setInteger(JobManagerOptions.PORT, address.getPort()); + config.setString(RestOptions.REST_ADDRESS, address.getHostString()); + config.setInteger(RestOptions.REST_PORT, address.getPort()); } public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index a6f676e..3d50e93 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -719,7 +719,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster .orElse(false); } - private CompletableFuture<URL> getWebMonitorBaseUrl() { + @VisibleForTesting + CompletableFuture<URL> getWebMonitorBaseUrl() { return FutureUtils.orTimeout( webMonitorLeaderRetriever.getLeaderFuture(), restClusterClientConfiguration.getAwaitLeaderTimeout(), http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e7f9bf9..e2daad6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; @@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.commons.cli.CommandLine; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -111,6 +114,7 @@ import org.mockito.MockitoAnnotations; import javax.annotation.Nonnull; import java.io.IOException; +import java.net.URL; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -677,6 +681,37 @@ public class RestClusterClientTest extends TestLogger { } } + /** + * Tests that command line options override the configuration settings. + */ + @Test + public void testRESTManualConfigurationOverride() throws Exception { + final String configuredHostname = "localhost"; + final int configuredPort = 1234; + final Configuration configuration = new Configuration(); + + configuration.setString(JobManagerOptions.ADDRESS, configuredHostname); + configuration.setInteger(JobManagerOptions.PORT, configuredPort); + configuration.setString(RestOptions.REST_ADDRESS, configuredHostname); + configuration.setInteger(RestOptions.REST_PORT, configuredPort); + + final DefaultCLI defaultCLI = new DefaultCLI(configuration); + + final String manualHostname = "123.123.123.123"; + final int manualPort = 4321; + final String[] args = {"-m", manualHostname + ':' + manualPort}; + + CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); + + final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine); + + final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine)); + + URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get(); + assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname)); + assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort)); + } + private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { public TestAccumulatorHandler() {