This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 8bfddc6  [FLINK-20944][k8s] Do not resolve the rest endpoint address 
when the service exposed type is ClusterIP
8bfddc6 is described below

commit 8bfddc606e39e656d144ad3d60c70627a0010fc7
Author: wangyang0918 <[email protected]>
AuthorDate: Tue Jan 19 15:09:21 2021 +0800

    [FLINK-20944][k8s] Do not resolve the rest endpoint address when the 
service exposed type is ClusterIP
    
    This closes #14692.
---
 .../kubernetes/KubernetesClusterDescriptor.java    | 22 ++++++++++++++-----
 .../KubernetesClusterDescriptorTest.java           | 25 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index 65c7573..57137d5 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -112,11 +112,7 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor<String> {
                 return new RestClusterClient<>(
                         configuration,
                         clusterId,
-                        new StandaloneClientHAServices(
-                                
HighAvailabilityServicesUtils.getWebMonitorAddress(
-                                        configuration,
-                                        
HighAvailabilityServicesUtils.AddressResolution
-                                                .TRY_ADDRESS_RESOLUTION)));
+                        new 
StandaloneClientHAServices(getWebMonitorAddress(configuration)));
             } catch (Exception e) {
                 client.handleException(e);
                 throw new RuntimeException(
@@ -125,6 +121,22 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor<String> {
         };
     }
 
+    private String getWebMonitorAddress(Configuration configuration) throws 
Exception {
+        HighAvailabilityServicesUtils.AddressResolution resolution =
+                
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION;
+        if 
(configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
+                == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+            resolution = 
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION;
+            LOG.warn(
+                    "Please note that Flink client operations(e.g. cancel, 
list, stop,"
+                            + " savepoint, etc.) won't work from outside the 
Kubernetes cluster"
+                            + " since '{}' has been set to {}.",
+                    KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+        }
+        return 
HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
+    }
+
     @Override
     public ClusterClientProvider<String> retrieve(String clusterId) {
         final ClusterClientProvider<String> clusterClientProvider =
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
index 52c63c1..b1ac2f9e5 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
 import org.apache.flink.kubernetes.utils.Constants;
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static 
org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link KubernetesClusterDescriptor}. */
@@ -186,6 +190,27 @@ public class KubernetesClusterDescriptorTest extends 
KubernetesClientTestBase {
                 () -> 
descriptor.deployApplicationCluster(clusterSpecification, appConfig));
     }
 
+    @Test
+    public void testDeployApplicationClusterWithClusterIP() throws Exception {
+        flinkConfig.set(
+                PipelineOptions.JARS, 
Collections.singletonList("local:///path/of/user.jar"));
+        flinkConfig.set(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
+        flinkConfig.set(
+                KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+                KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+
+        final ClusterClient<String> clusterClient =
+                descriptor
+                        .deployApplicationCluster(clusterSpecification, 
appConfig)
+                        .getClusterClient();
+
+        final String address = CLUSTER_ID + 
Constants.FLINK_REST_SERVICE_SUFFIX + "." + NAMESPACE;
+        final int port = flinkConfig.get(RestOptions.PORT);
+        assertThat(
+                clusterClient.getWebInterfaceURL(),
+                is(String.format("http://%s:%d";, address, port)));
+    }
+
     private ClusterClientProvider<String> deploySessionCluster() throws 
ClusterDeploymentException {
         mockExpectedServiceFromServerSide(loadBalancerSvc);
         return descriptor.deploySessionCluster(clusterSpecification);

Reply via email to