This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 38f3e92  [FLINK-26676] Make ClusterIP the default rest service type
38f3e92 is described below

commit 38f3e92673b0a0a8a59ddcc1b5392a4d598bd91a
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Mar 16 15:49:50 2022 +0100

    [FLINK-26676] Make ClusterIP the default rest service type
---
 .../operator/utils/FlinkConfigBuilder.java          | 10 +++++++++-
 .../operator/utils/FlinkConfigBuilderTest.java      | 21 ++++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 83a964f..6a14f18 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -43,6 +43,7 @@ import java.nio.file.Files;
 import java.util.Collections;
 
 import static 
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
+import static 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
 import static 
org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
@@ -80,6 +81,13 @@ public class FlinkConfigBuilder {
         if (spec.getFlinkConfiguration() != null && 
!spec.getFlinkConfiguration().isEmpty()) {
             spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
         }
+
+        // Adapt default rest service type from 1.15+
+        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
+            effectiveConfig.set(
+                    REST_SERVICE_EXPOSED_TYPE,
+                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+        }
         return this;
     }
 
@@ -107,7 +115,7 @@ public class FlinkConfigBuilder {
         // Web UI
         if (spec.getIngressDomain() != null) {
             effectiveConfig.set(
-                    KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+                    REST_SERVICE_EXPOSED_TYPE,
                     KubernetesConfigOptions.ServiceExposedType.ClusterIP);
         }
         return this;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 8d6aee5..b739472 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -103,11 +103,30 @@ public class FlinkConfigBuilderTest {
 
     @Test
     public void testApplyFlinkConfiguration() {
-        final Configuration configuration =
+        Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
         Assert.assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+        Assert.assertEquals(
+                KubernetesConfigOptions.ServiceExposedType.ClusterIP,
+                
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+
+        FlinkDeployment deployment = 
ReconciliationUtils.clone(flinkDeployment);
+        deployment
+                .getSpec()
+                .setFlinkConfiguration(
+                        Map.of(
+                                
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+                                
KubernetesConfigOptions.ServiceExposedType.LoadBalancer.name()));
+
+        configuration =
+                new FlinkConfigBuilder(deployment, new Configuration())
+                        .applyFlinkConfiguration()
+                        .build();
+        Assert.assertEquals(
+                KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
+                
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
     }
 
     @Test

Reply via email to