This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5682472e402 PyFlink remote execution should support URLs with paths 
and https scheme
5682472e402 is described below

commit 5682472e4029c25d4a2651a609c999029fa3281b
Author: Elkhan Dadashov <[email protected]>
AuthorDate: Tue Sep 12 21:32:21 2023 -0700

    PyFlink remote execution should support URLs with paths and https scheme
---
 .../generated/common_host_port_section.html        |  6 ++
 .../shortcodes/generated/rest_configuration.html   |  6 ++
 .../org/apache/flink/client/cli/DefaultCLI.java    | 12 ++++
 .../client/program/rest/RestClusterClient.java     | 39 ++++++++++++-
 .../apache/flink/client/cli/DefaultCLITest.java    | 32 ++++++++++-
 .../client/program/rest/RestClusterClientTest.java | 66 +++++++++++++++++-----
 .../apache/flink/configuration/RestOptions.java    |  9 +++
 .../main/java/org/apache/flink/util/NetUtils.java  |  9 ++-
 .../java/org/apache/flink/util/NetUtilsTest.java   |  8 +++
 9 files changed, 170 insertions(+), 17 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/common_host_port_section.html 
b/docs/layouts/shortcodes/generated/common_host_port_section.html
index c2753d125e9..624278f7fb4 100644
--- a/docs/layouts/shortcodes/generated/common_host_port_section.html
+++ b/docs/layouts/shortcodes/generated/common_host_port_section.html
@@ -44,6 +44,12 @@
             <td>String</td>
             <td>The port that the server binds itself. Accepts a list of ports 
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple Rest 
servers are running on the same machine.</td>
         </tr>
+        <tr>
+            <td><h5>rest.path</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The path that should be used by clients to interact to the 
server which is accessible via URL.</td>
+        </tr>
         <tr>
             <td><h5>rest.port</h5></td>
             <td style="word-wrap: break-word;">8081</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html 
b/docs/layouts/shortcodes/generated/rest_configuration.html
index 467b4275acd..af006be2037 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -104,6 +104,12 @@
             <td>Long</td>
             <td>The maximum time in ms for a connection to stay idle before 
failing.</td>
         </tr>
+        <tr>
+            <td><h5>rest.path</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The path that should be used by clients to interact to the 
server which is accessible via URL.</td>
+        </tr>
         <tr>
             <td><h5>rest.port</h5></td>
             <td style="word-wrap: break-word;">8081</td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 273e007a8ae..d7f2fb25ce4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -21,6 +21,8 @@ package org.apache.flink.client.cli;
 import org.apache.flink.client.deployment.executors.RemoteExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.NetUtils;
 
@@ -29,6 +31,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
 
 import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
 
@@ -60,6 +63,11 @@ public class DefaultCLI extends AbstractCustomCommandLine {
             String addressWithPort = 
commandLine.getOptionValue(addressOption.getOpt());
             InetSocketAddress jobManagerAddress = 
NetUtils.parseHostPortAddress(addressWithPort);
             setJobManagerAddressInConfig(resultingConfiguration, 
jobManagerAddress);
+
+            URL url = NetUtils.getCorrectHostnamePort(addressWithPort);
+            resultingConfiguration.setString(RestOptions.PATH, url.getPath());
+            resultingConfiguration.setBoolean(
+                    SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
         }
         resultingConfiguration.setString(DeploymentOptions.TARGET, 
RemoteExecutor.NAME);
 
@@ -68,6 +76,10 @@ public class DefaultCLI extends AbstractCustomCommandLine {
         return resultingConfiguration;
     }
 
+    private static boolean isHttpsProtocol(URL url) {
+        return url.getProtocol() != null && 
(url.getProtocol().equalsIgnoreCase("https"));
+    }
+
     @Override
     public String getId() {
         return ID;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 4318d0a6c42..b05dc331853 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -25,10 +25,15 @@ import 
org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -48,11 +53,13 @@ import 
org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -192,6 +199,10 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                     ExceptionUtils.findThrowable(exception, 
JobStateUnknownException.class)
                             .isPresent();
 
+    private final URL jobmanagerUrl;
+
+    private final Collection<HttpHeader> customHttpHeaders;
+
     public RestClusterClient(Configuration config, T clusterId) throws 
Exception {
         this(config, clusterId, 
DefaultClientHighAvailabilityServicesFactory.INSTANCE);
     }
@@ -229,10 +240,20 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
         this.restClusterClientConfiguration =
                 
RestClusterClientConfiguration.fromConfiguration(configuration);
 
+        this.customHttpHeaders =
+                ClientUtils.readHeadersFromEnvironmentVariable(
+                        ConfigConstants.FLINK_REST_CLIENT_HEADERS);
+        jobmanagerUrl =
+                new URL(
+                        SecurityOptions.isRestSSLEnabled(configuration) ? 
"https" : "http",
+                        configuration.getString(JobManagerOptions.ADDRESS),
+                        configuration.getInteger(JobManagerOptions.PORT),
+                        configuration.getString(RestOptions.PATH));
+
         if (restClient != null) {
             this.restClient = restClient;
         } else {
-            this.restClient = new RestClient(configuration, executorService);
+            this.restClient = RestClient.forUrl(configuration, 
executorService, jobmanagerUrl);
         }
 
         this.waitStrategy = checkNotNull(waitStrategy);
@@ -828,6 +849,16 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                 .thenApply(ignored -> Acknowledge.get());
     }
 
+    @VisibleForTesting
+    URL getJobmanagerUrl() {
+        return jobmanagerUrl;
+    }
+
+    @VisibleForTesting
+    Collection<HttpHeader> getCustomHttpHeaders() {
+        return customHttpHeaders;
+    }
+
     /**
      * Get an overview of the Flink cluster.
      *
@@ -980,6 +1011,12 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                                 .thenCompose(
                                         webMonitorBaseUrl -> {
                                             try {
+                                                CustomHeadersDecorator<R, P, 
U> headers =
+                                                        new 
CustomHeadersDecorator<>(
+                                                                new 
UrlPrefixDecorator<>(
+                                                                        
messageHeaders,
+                                                                        
jobmanagerUrl.getPath()));
+                                                
headers.setCustomHeaders(customHttpHeaders);
                                                 final CompletableFuture<P> 
future =
                                                         restClient.sendRequest(
                                                                 
webMonitorBaseUrl.getHost(),
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index a9663e3ae4c..bc7e0c1d299 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.junit.jupiter.api.Test;
@@ -36,8 +37,9 @@ class DefaultCLITest {
     @Test
     void testCommandLineMaterialization() throws Exception {
         final String hostname = "home-sweet-home";
+        final String urlPath = "/some/other/path/index.html";
         final int port = 1234;
-        final String[] args = {"-m", hostname + ':' + port};
+        final String[] args = {"-m", hostname + ':' + port + urlPath};
 
         final AbstractCustomCommandLine defaultCLI = new DefaultCLI();
         final CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
@@ -46,6 +48,34 @@ class DefaultCLITest {
 
         assertThat(configuration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
         assertThat(configuration.get(RestOptions.PORT)).isEqualTo(port);
+
+        final String httpProtocol = "http";
+        
assertThat(configuration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
+        assertThat(configuration.get(RestOptions.PATH)).isEqualTo(urlPath);
+
+        final String hostnameWithHttpScheme = httpProtocol + "://" + hostname;
+        final String[] httpArgs = {"-m", hostnameWithHttpScheme + ':' + port + 
urlPath};
+        final CommandLine httpCommandLine = 
defaultCLI.parseCommandLineOptions(httpArgs, false);
+
+        Configuration httpConfiguration = 
defaultCLI.toConfiguration(httpCommandLine);
+
+        
assertThat(httpConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+        assertThat(httpConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+        
assertThat(httpConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
+        assertThat(httpConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);
+
+        final String httpsProtocol = "https";
+
+        final String hostnameWithHttpsScheme = httpsProtocol + "://" + 
hostname;
+        final String[] httpsArgs = {"-m", hostnameWithHttpsScheme + ':' + port 
+ urlPath};
+        final CommandLine httpsCommandLine = 
defaultCLI.parseCommandLineOptions(httpsArgs, false);
+
+        Configuration httpsConfiguration = 
defaultCLI.toConfiguration(httpsCommandLine);
+
+        
assertThat(httpsConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+        assertThat(httpsConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+        
assertThat(httpsConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(true);
+        
assertThat(httpsConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);
     }
 
     @Test
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e44174beb78..4f7e2f0b08d 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -50,6 +51,7 @@ import 
org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -688,21 +690,59 @@ class RestClusterClientTest {
     /** Tests that command line options override the configuration settings. */
     @Test
     void testRESTManualConfigurationOverride() throws Exception {
-        final String configuredHostname = "localhost";
-        final int configuredPort = 1234;
-        final Configuration configuration = new Configuration();
-
-        configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
-        configuration.setInteger(JobManagerOptions.PORT, configuredPort);
-        configuration.setString(RestOptions.ADDRESS, configuredHostname);
-        configuration.setInteger(RestOptions.PORT, configuredPort);
-
-        final DefaultCLI defaultCLI = new DefaultCLI();
-
         final String manualHostname = "123.123.123.123";
         final int manualPort = 4321;
+        final String httpProtocol = "http";
         final String[] args = {"-m", manualHostname + ':' + manualPort};
 
+        final RestClusterClient<?> clusterClient = getRestClusterClient(args);
+
+        URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
+        
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
+        assertThat(clusterClient.getJobmanagerUrl())
+                .hasHost(manualHostname)
+                .hasPort(manualPort)
+                .hasNoPath()
+                .hasProtocol(httpProtocol);
+        assertThat(clusterClient.getCustomHttpHeaders()).isEmpty();
+
+        final String urlPath = "/some/path/here/index.html";
+        final String httpsProtocol = "https";
+        final String[] httpsUrlArgs = {
+            "-m", httpsProtocol + "://" + manualHostname + ':' + manualPort + 
urlPath
+        };
+
+        final Map<String, String> envMap =
+                Collections.singletonMap(
+                        ConfigConstants.FLINK_REST_CLIENT_HEADERS,
+                        
"Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped");
+        org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap);
+
+        final RestClusterClient<?> newClusterClient = 
getRestClusterClient(httpsUrlArgs);
+        assertThat(newClusterClient.getWebMonitorBaseUrl().get())
+                .hasHost(manualHostname)
+                .hasPort(manualPort);
+
+        final URL jobManagerUrl = newClusterClient.getJobmanagerUrl();
+        assertThat(jobManagerUrl)
+                .hasHost(manualHostname)
+                .hasPort(manualPort)
+                .hasPath(urlPath)
+                .hasProtocol(httpsProtocol);
+
+        final List<HttpHeader> customHttpHeaders =
+                new ArrayList<>(newClusterClient.getCustomHttpHeaders());
+        final HttpHeader expectedHeader1 = new HttpHeader("Cookie", 
"authCookie=12:345");
+        final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader", 
"value1,value2");
+        assertThat(customHttpHeaders).hasSize(2);
+        assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1);
+        assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2);
+    }
+
+    private static RestClusterClient<?> getRestClusterClient(String[] args)
+            throws CliArgsException, FlinkException {
+        final DefaultCLI defaultCLI = new DefaultCLI();
+
         CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, 
false);
 
         final ClusterClientServiceLoader serviceLoader = new 
DefaultClusterClientServiceLoader();
@@ -719,9 +759,7 @@ class RestClusterClientTest {
                         clusterDescriptor
                                 
.retrieve(clusterFactory.getClusterId(executorConfig))
                                 .getClusterClient();
-
-        URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
-        
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
+        return clusterClient;
     }
 
     /** Tests that the send operation is being retried. */
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index d3c1a6baa90..a4db7011787 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -80,6 +80,15 @@ public class RestOptions {
                     .withDescription(
                             "The address that should be used by clients to 
connect to the server. Attention: This option is respected only if the 
high-availability configuration is NONE.");
 
+    /** The path that should be used by clients to interact with the server. */
+    @Documentation.Section(Documentation.Sections.COMMON_HOST_PORT)
+    public static final ConfigOption<String> PATH =
+            key("rest.path")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "The path that should be used by clients to 
interact to the server which is accessible via URL.");
+
     /**
      * The port that the REST client connects to and the REST server binds to 
if {@link #BIND_PORT}
      * has not been specified.
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index f3c4ac9d96d..41d657c42c4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -103,8 +103,15 @@ public class NetUtils {
      * @return URL object for accessing host and port
      */
     private static URL validateHostPortString(String hostPort) {
+        if (StringUtils.isNullOrWhitespaceOnly(hostPort)) {
+            throw new IllegalArgumentException("hostPort should not be null or 
empty");
+        }
         try {
-            URL u = new URL("http://"; + hostPort);
+            URL u =
+                    (hostPort.toLowerCase().startsWith("http://";)
+                                    || 
hostPort.toLowerCase().startsWith("https://";))
+                            ? new URL(hostPort)
+                            : new URL("http://"; + hostPort);
             if (u.getHost() == null) {
                 throw new IllegalArgumentException(
                         "The given host:port ('" + hostPort + "') doesn't 
contain a valid host");
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index 9ead139babd..da186c2aea9 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -51,6 +51,14 @@ public class NetUtilsTest extends TestLogger {
         assertEquals(url, 
NetUtils.getCorrectHostnamePort("foo.com:8080/index.html"));
     }
 
+    @Test
+    public void testCorrectHostnamePortWithHttpsScheme() throws Exception {
+        final URL url = new URL("https", "foo.com", 8080, 
"/some/other/path/index.html");
+        assertEquals(
+                url,
+                
NetUtils.getCorrectHostnamePort("https://foo.com:8080/some/other/path/index.html";));
+    }
+
     @Test
     public void testParseHostPortAddress() {
         final InetSocketAddress socketAddress = new 
InetSocketAddress("foo.com", 8080);

Reply via email to