This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5682472e402 PyFlink remote execution should support URLs with paths
and https scheme
5682472e402 is described below
commit 5682472e4029c25d4a2651a609c999029fa3281b
Author: Elkhan Dadashov <[email protected]>
AuthorDate: Tue Sep 12 21:32:21 2023 -0700
PyFlink remote execution should support URLs with paths and https scheme
---
.../generated/common_host_port_section.html | 6 ++
.../shortcodes/generated/rest_configuration.html | 6 ++
.../org/apache/flink/client/cli/DefaultCLI.java | 12 ++++
.../client/program/rest/RestClusterClient.java | 39 ++++++++++++-
.../apache/flink/client/cli/DefaultCLITest.java | 32 ++++++++++-
.../client/program/rest/RestClusterClientTest.java | 66 +++++++++++++++++-----
.../apache/flink/configuration/RestOptions.java | 9 +++
.../main/java/org/apache/flink/util/NetUtils.java | 9 ++-
.../java/org/apache/flink/util/NetUtilsTest.java | 8 +++
9 files changed, 170 insertions(+), 17 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/common_host_port_section.html
b/docs/layouts/shortcodes/generated/common_host_port_section.html
index c2753d125e9..624278f7fb4 100644
--- a/docs/layouts/shortcodes/generated/common_host_port_section.html
+++ b/docs/layouts/shortcodes/generated/common_host_port_section.html
@@ -44,6 +44,12 @@
<td>String</td>
<td>The port that the server binds itself. Accepts a list of ports
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is
recommended to set a range of ports to avoid collisions when multiple Rest
servers are running on the same machine.</td>
</tr>
+ <tr>
+ <td><h5>rest.path</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The path that should be used by clients to interact to the
server which is accessible via URL.</td>
+ </tr>
<tr>
<td><h5>rest.port</h5></td>
<td style="word-wrap: break-word;">8081</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html
b/docs/layouts/shortcodes/generated/rest_configuration.html
index 467b4275acd..af006be2037 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -104,6 +104,12 @@
<td>Long</td>
<td>The maximum time in ms for a connection to stay idle before
failing.</td>
</tr>
+ <tr>
+ <td><h5>rest.path</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The path that should be used by clients to interact to the
server which is accessible via URL.</td>
+ </tr>
<tr>
<td><h5>rest.port</h5></td>
<td style="word-wrap: break-word;">8081</td>
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 273e007a8ae..d7f2fb25ce4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -21,6 +21,8 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
@@ -29,6 +31,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import java.net.InetSocketAddress;
+import java.net.URL;
import static
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
@@ -60,6 +63,11 @@ public class DefaultCLI extends AbstractCustomCommandLine {
String addressWithPort =
commandLine.getOptionValue(addressOption.getOpt());
InetSocketAddress jobManagerAddress =
NetUtils.parseHostPortAddress(addressWithPort);
setJobManagerAddressInConfig(resultingConfiguration,
jobManagerAddress);
+
+ URL url = NetUtils.getCorrectHostnamePort(addressWithPort);
+ resultingConfiguration.setString(RestOptions.PATH, url.getPath());
+ resultingConfiguration.setBoolean(
+ SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
}
resultingConfiguration.setString(DeploymentOptions.TARGET,
RemoteExecutor.NAME);
@@ -68,6 +76,10 @@ public class DefaultCLI extends AbstractCustomCommandLine {
return resultingConfiguration;
}
+ private static boolean isHttpsProtocol(URL url) {
+ return url.getProtocol() != null &&
(url.getProtocol().equalsIgnoreCase("https"));
+ }
+
@Override
public String getId() {
return ID;
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 4318d0a6c42..b05dc331853 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -25,10 +25,15 @@ import
org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -48,11 +53,13 @@ import
org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -192,6 +199,10 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
ExceptionUtils.findThrowable(exception,
JobStateUnknownException.class)
.isPresent();
+ private final URL jobmanagerUrl;
+
+ private final Collection<HttpHeader> customHttpHeaders;
+
public RestClusterClient(Configuration config, T clusterId) throws
Exception {
this(config, clusterId,
DefaultClientHighAvailabilityServicesFactory.INSTANCE);
}
@@ -229,10 +240,20 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);
+ this.customHttpHeaders =
+ ClientUtils.readHeadersFromEnvironmentVariable(
+ ConfigConstants.FLINK_REST_CLIENT_HEADERS);
+ jobmanagerUrl =
+ new URL(
+ SecurityOptions.isRestSSLEnabled(configuration) ?
"https" : "http",
+ configuration.getString(JobManagerOptions.ADDRESS),
+ configuration.getInteger(JobManagerOptions.PORT),
+ configuration.getString(RestOptions.PATH));
+
if (restClient != null) {
this.restClient = restClient;
} else {
- this.restClient = new RestClient(configuration, executorService);
+ this.restClient = RestClient.forUrl(configuration,
executorService, jobmanagerUrl);
}
this.waitStrategy = checkNotNull(waitStrategy);
@@ -828,6 +849,16 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
.thenApply(ignored -> Acknowledge.get());
}
+ @VisibleForTesting
+ URL getJobmanagerUrl() {
+ return jobmanagerUrl;
+ }
+
+ @VisibleForTesting
+ Collection<HttpHeader> getCustomHttpHeaders() {
+ return customHttpHeaders;
+ }
+
/**
* Get an overview of the Flink cluster.
*
@@ -980,6 +1011,12 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
.thenCompose(
webMonitorBaseUrl -> {
try {
+ CustomHeadersDecorator<R, P,
U> headers =
+ new
CustomHeadersDecorator<>(
+ new
UrlPrefixDecorator<>(
+
messageHeaders,
+
jobmanagerUrl.getPath()));
+
headers.setCustomHeaders(customHttpHeaders);
final CompletableFuture<P>
future =
restClient.sendRequest(
webMonitorBaseUrl.getHost(),
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index a9663e3ae4c..bc7e0c1d299 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.commons.cli.CommandLine;
import org.junit.jupiter.api.Test;
@@ -36,8 +37,9 @@ class DefaultCLITest {
@Test
void testCommandLineMaterialization() throws Exception {
final String hostname = "home-sweet-home";
+ final String urlPath = "/some/other/path/index.html";
final int port = 1234;
- final String[] args = {"-m", hostname + ':' + port};
+ final String[] args = {"-m", hostname + ':' + port + urlPath};
final AbstractCustomCommandLine defaultCLI = new DefaultCLI();
final CommandLine commandLine =
defaultCLI.parseCommandLineOptions(args, false);
@@ -46,6 +48,34 @@ class DefaultCLITest {
assertThat(configuration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
assertThat(configuration.get(RestOptions.PORT)).isEqualTo(port);
+
+ final String httpProtocol = "http";
+
assertThat(configuration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
+ assertThat(configuration.get(RestOptions.PATH)).isEqualTo(urlPath);
+
+ final String hostnameWithHttpScheme = httpProtocol + "://" + hostname;
+ final String[] httpArgs = {"-m", hostnameWithHttpScheme + ':' + port +
urlPath};
+ final CommandLine httpCommandLine =
defaultCLI.parseCommandLineOptions(httpArgs, false);
+
+ Configuration httpConfiguration =
defaultCLI.toConfiguration(httpCommandLine);
+
+
assertThat(httpConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+ assertThat(httpConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+
assertThat(httpConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
+ assertThat(httpConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);
+
+ final String httpsProtocol = "https";
+
+ final String hostnameWithHttpsScheme = httpsProtocol + "://" +
hostname;
+ final String[] httpsArgs = {"-m", hostnameWithHttpsScheme + ':' + port
+ urlPath};
+ final CommandLine httpsCommandLine =
defaultCLI.parseCommandLineOptions(httpsArgs, false);
+
+ Configuration httpsConfiguration =
defaultCLI.toConfiguration(httpsCommandLine);
+
+
assertThat(httpsConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+ assertThat(httpsConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+
assertThat(httpsConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(true);
+
assertThat(httpsConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);
}
@Test
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e44174beb78..4f7e2f0b08d 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -50,6 +51,7 @@ import
org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -688,21 +690,59 @@ class RestClusterClientTest {
/** Tests that command line options override the configuration settings. */
@Test
void testRESTManualConfigurationOverride() throws Exception {
- final String configuredHostname = "localhost";
- final int configuredPort = 1234;
- final Configuration configuration = new Configuration();
-
- configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
- configuration.setInteger(JobManagerOptions.PORT, configuredPort);
- configuration.setString(RestOptions.ADDRESS, configuredHostname);
- configuration.setInteger(RestOptions.PORT, configuredPort);
-
- final DefaultCLI defaultCLI = new DefaultCLI();
-
final String manualHostname = "123.123.123.123";
final int manualPort = 4321;
+ final String httpProtocol = "http";
final String[] args = {"-m", manualHostname + ':' + manualPort};
+ final RestClusterClient<?> clusterClient = getRestClusterClient(args);
+
+ URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
+
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
+ assertThat(clusterClient.getJobmanagerUrl())
+ .hasHost(manualHostname)
+ .hasPort(manualPort)
+ .hasNoPath()
+ .hasProtocol(httpProtocol);
+ assertThat(clusterClient.getCustomHttpHeaders()).isEmpty();
+
+ final String urlPath = "/some/path/here/index.html";
+ final String httpsProtocol = "https";
+ final String[] httpsUrlArgs = {
+ "-m", httpsProtocol + "://" + manualHostname + ':' + manualPort +
urlPath
+ };
+
+ final Map<String, String> envMap =
+ Collections.singletonMap(
+ ConfigConstants.FLINK_REST_CLIENT_HEADERS,
+
"Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped");
+ org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap);
+
+ final RestClusterClient<?> newClusterClient =
getRestClusterClient(httpsUrlArgs);
+ assertThat(newClusterClient.getWebMonitorBaseUrl().get())
+ .hasHost(manualHostname)
+ .hasPort(manualPort);
+
+ final URL jobManagerUrl = newClusterClient.getJobmanagerUrl();
+ assertThat(jobManagerUrl)
+ .hasHost(manualHostname)
+ .hasPort(manualPort)
+ .hasPath(urlPath)
+ .hasProtocol(httpsProtocol);
+
+ final List<HttpHeader> customHttpHeaders =
+ new ArrayList<>(newClusterClient.getCustomHttpHeaders());
+ final HttpHeader expectedHeader1 = new HttpHeader("Cookie",
"authCookie=12:345");
+ final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader",
"value1,value2");
+ assertThat(customHttpHeaders).hasSize(2);
+ assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1);
+ assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2);
+ }
+
+ private static RestClusterClient<?> getRestClusterClient(String[] args)
+ throws CliArgsException, FlinkException {
+ final DefaultCLI defaultCLI = new DefaultCLI();
+
CommandLine commandLine = defaultCLI.parseCommandLineOptions(args,
false);
final ClusterClientServiceLoader serviceLoader = new
DefaultClusterClientServiceLoader();
@@ -719,9 +759,7 @@ class RestClusterClientTest {
clusterDescriptor
.retrieve(clusterFactory.getClusterId(executorConfig))
.getClusterClient();
-
- URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
-
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
+ return clusterClient;
}
/** Tests that the send operation is being retried. */
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index d3c1a6baa90..a4db7011787 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -80,6 +80,15 @@ public class RestOptions {
.withDescription(
"The address that should be used by clients to
connect to the server. Attention: This option is respected only if the
high-availability configuration is NONE.");
+ /** The path that should be used by clients to interact with the server. */
+ @Documentation.Section(Documentation.Sections.COMMON_HOST_PORT)
+ public static final ConfigOption<String> PATH =
+ key("rest.path")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "The path that should be used by clients to
interact to the server which is accessible via URL.");
+
/**
* The port that the REST client connects to and the REST server binds to
if {@link #BIND_PORT}
* has not been specified.
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index f3c4ac9d96d..41d657c42c4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -103,8 +103,15 @@ public class NetUtils {
* @return URL object for accessing host and port
*/
private static URL validateHostPortString(String hostPort) {
+ if (StringUtils.isNullOrWhitespaceOnly(hostPort)) {
+ throw new IllegalArgumentException("hostPort should not be null or
empty");
+ }
try {
- URL u = new URL("http://" + hostPort);
+ URL u =
+ (hostPort.toLowerCase().startsWith("http://")
+ ||
hostPort.toLowerCase().startsWith("https://"))
+ ? new URL(hostPort)
+ : new URL("http://" + hostPort);
if (u.getHost() == null) {
throw new IllegalArgumentException(
"The given host:port ('" + hostPort + "') doesn't
contain a valid host");
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index 9ead139babd..da186c2aea9 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -51,6 +51,14 @@ public class NetUtilsTest extends TestLogger {
assertEquals(url,
NetUtils.getCorrectHostnamePort("foo.com:8080/index.html"));
}
+ @Test
+ public void testCorrectHostnamePortWithHttpsScheme() throws Exception {
+ final URL url = new URL("https", "foo.com", 8080,
"/some/other/path/index.html");
+ assertEquals(
+ url,
+
NetUtils.getCorrectHostnamePort("https://foo.com:8080/some/other/path/index.html"));
+ }
+
@Test
public void testParseHostPortAddress() {
final InetSocketAddress socketAddress = new
InetSocketAddress("foo.com", 8080);