This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 38f3e92 [FLINK-26676] Make ClusterIP the default rest service type
38f3e92 is described below
commit 38f3e92673b0a0a8a59ddcc1b5392a4d598bd91a
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Mar 16 15:49:50 2022 +0100
[FLINK-26676] Make ClusterIP the default rest service type
---
.../operator/utils/FlinkConfigBuilder.java | 10 +++++++++-
.../operator/utils/FlinkConfigBuilderTest.java | 21 ++++++++++++++++++++-
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 83a964f..6a14f18 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -43,6 +43,7 @@ import java.nio.file.Files;
import java.util.Collections;
import static
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
+import static
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
import static
org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
@@ -80,6 +81,13 @@ public class FlinkConfigBuilder {
if (spec.getFlinkConfiguration() != null &&
!spec.getFlinkConfiguration().isEmpty()) {
spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
}
+
+ // Adapt default rest service type from 1.15+
+ if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
+ effectiveConfig.set(
+ REST_SERVICE_EXPOSED_TYPE,
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+ }
return this;
}
@@ -107,7 +115,7 @@ public class FlinkConfigBuilder {
// Web UI
if (spec.getIngressDomain() != null) {
effectiveConfig.set(
- KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+ REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
}
return this;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 8d6aee5..b739472 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -103,11 +103,30 @@ public class FlinkConfigBuilderTest {
@Test
public void testApplyFlinkConfiguration() {
- final Configuration configuration =
+ Configuration configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyFlinkConfiguration()
.build();
Assert.assertEquals(2, (int)
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+ Assert.assertEquals(
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP,
+
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+
+ FlinkDeployment deployment =
ReconciliationUtils.clone(flinkDeployment);
+ deployment
+ .getSpec()
+ .setFlinkConfiguration(
+ Map.of(
+
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+
KubernetesConfigOptions.ServiceExposedType.LoadBalancer.name()));
+
+ configuration =
+ new FlinkConfigBuilder(deployment, new Configuration())
+ .applyFlinkConfiguration()
+ .build();
+ Assert.assertEquals(
+ KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
+
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
}
@Test