This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 8bfddc6 [FLINK-20944][k8s] Do not resolve the rest endpoint address
when the service exposed type is ClusterIP
8bfddc6 is described below
commit 8bfddc606e39e656d144ad3d60c70627a0010fc7
Author: wangyang0918 <[email protected]>
AuthorDate: Tue Jan 19 15:09:21 2021 +0800
[FLINK-20944][k8s] Do not resolve the rest endpoint address when the
service exposed type is ClusterIP
This closes #14692.
---
.../kubernetes/KubernetesClusterDescriptor.java | 22 ++++++++++++++-----
.../KubernetesClusterDescriptorTest.java | 25 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index 65c7573..57137d5 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -112,11 +112,7 @@ public class KubernetesClusterDescriptor implements
ClusterDescriptor<String> {
return new RestClusterClient<>(
configuration,
clusterId,
- new StandaloneClientHAServices(
-
HighAvailabilityServicesUtils.getWebMonitorAddress(
- configuration,
-
HighAvailabilityServicesUtils.AddressResolution
- .TRY_ADDRESS_RESOLUTION)));
+ new
StandaloneClientHAServices(getWebMonitorAddress(configuration)));
} catch (Exception e) {
client.handleException(e);
throw new RuntimeException(
@@ -125,6 +121,22 @@ public class KubernetesClusterDescriptor implements
ClusterDescriptor<String> {
};
}
+ private String getWebMonitorAddress(Configuration configuration) throws
Exception {
+ HighAvailabilityServicesUtils.AddressResolution resolution =
+
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION;
+ if
(configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
+ == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+ resolution =
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION;
+ LOG.warn(
+ "Please note that Flink client operations(e.g. cancel,
list, stop,"
+ + " savepoint, etc.) won't work from outside the
Kubernetes cluster"
+ + " since '{}' has been set to {}.",
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+ }
+ return
HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
+ }
+
@Override
public ClusterClientProvider<String> retrieve(String clusterId) {
final ClusterClientProvider<String> clusterClientProvider =
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
index 52c63c1..b1ac2f9e5 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import
org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.utils.Constants;
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static
org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/** Tests for the {@link KubernetesClusterDescriptor}. */
@@ -186,6 +190,27 @@ public class KubernetesClusterDescriptorTest extends
KubernetesClientTestBase {
() ->
descriptor.deployApplicationCluster(clusterSpecification, appConfig));
}
+ @Test
+ public void testDeployApplicationClusterWithClusterIP() throws Exception {
+ flinkConfig.set(
+ PipelineOptions.JARS,
Collections.singletonList("local:///path/of/user.jar"));
+ flinkConfig.set(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.APPLICATION.getName());
+ flinkConfig.set(
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+
+ final ClusterClient<String> clusterClient =
+ descriptor
+ .deployApplicationCluster(clusterSpecification,
appConfig)
+ .getClusterClient();
+
+ final String address = CLUSTER_ID +
Constants.FLINK_REST_SERVICE_SUFFIX + "." + NAMESPACE;
+ final int port = flinkConfig.get(RestOptions.PORT);
+ assertThat(
+ clusterClient.getWebInterfaceURL(),
+ is(String.format("http://%s:%d", address, port)));
+ }
+
private ClusterClientProvider<String> deploySessionCluster() throws
ClusterDeploymentException {
mockExpectedServiceFromServerSide(loadBalancerSvc);
return descriptor.deploySessionCluster(clusterSpecification);