This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 74c69a4  [FLINK-26640] Make FlinkVersion enum and required
74c69a4 is described below

commit 74c69a4f7ac4620d126e8fa6b3489a1c6223f4a2
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Mar 17 11:03:35 2022 +0100

    [FLINK-26640] Make FlinkVersion enum and required
---
 docs/content/docs/custom-resource/overview.md      |  1 +
 docs/content/docs/custom-resource/reference.md     | 13 +++++-
 e2e-tests/data/cr.yaml                             |  2 +-
 examples/basic-checkpoint-ha.yaml                  |  2 +-
 examples/basic-ingress.yaml                        |  2 +-
 examples/basic-session.yaml                        |  2 +-
 examples/basic.yaml                                |  2 +-
 examples/custom-logging.yaml                       |  2 +-
 examples/pod-template.yaml                         |  2 +-
 .../operator/crd/spec/FlinkDeploymentSpec.java     |  2 +-
 .../kubernetes/operator/crd/spec/FlinkVersion.java | 50 ++++++++++++++++++++++
 .../validation/DefaultDeploymentValidator.java     |  9 ++++
 .../kubernetes/operator/FlinkOperatorITCase.java   |  3 +-
 .../flink/kubernetes/operator/TestUtils.java       |  3 +-
 .../validation/DeploymentValidatorTest.java        |  5 +++
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  8 ++++
 16 files changed, 97 insertions(+), 11 deletions(-)

diff --git a/docs/content/docs/custom-resource/overview.md 
b/docs/content/docs/custom-resource/overview.md
index 2cf25b3..6046a4c 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -82,6 +82,7 @@ The spec contains all the information the operator need to 
deploy and manage you
 
 Most deployments will define at least the following fields:
  - `image` : Docker used to run Flink job and task manager processes
+ - `flinkVersion` : Flink version used in the image (`v1_14`, `v1_15`...)
  - `serviceAccount` : Kubernetes service account used by the Flink pods
  - `taskManager, jobManager` : Job and Task manager pod resource specs (cpu, 
memory, etc.)
  - `flinkConfiguration` : Map of Flink configuration overrides such as HA and 
checkpointing configs
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 82ccd36..a475fe1 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -48,7 +48,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | image | java.lang.String | Flink docker image used to start the Job and 
TaskManager pods. |
 | imagePullPolicy | java.lang.String | Image pull policy of the Flink docker 
image. |
 | serviceAccount | java.lang.String | Kubernetes service used by the Flink 
deployment. |
-| flinkVersion | java.lang.String | Flink image version. |
+| flinkVersion | org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion | 
Flink image version. |
 | ingressDomain | java.lang.String | Ingress domain for the Flink deployment. |
 | flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | 
Flink configuration overrides for the Flink deployment. |
 | podTemplate | io.fabric8.kubernetes.api.model.Pod | Base pod template for 
job and task manager pods. Can be overridden by the jobManager and  taskManager 
pod templates. |
@@ -57,6 +57,17 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | job | org.apache.flink.kubernetes.operator.crd.spec.JobSpec | Job 
specification for application deployments. Null for session clusters. |
 | logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log 
configuration overrides for the Flink deployment. Format logConfigFileName ->  
configContent. |
 
+### FlinkVersion
+**Class**: org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion
+
+**Description**: Enumeration for supported Flink versions.
+
+| Value | Docs |
+| ----- | ---- |
+| v1_14 |  |
+| v1_15 |  |
+| v1_16 |  |
+
 ### JobManagerSpec
 **Class**: org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec
 
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index 3919e48..f697296 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -23,7 +23,7 @@ metadata:
   name: flink-example-statemachine
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
diff --git a/examples/basic-checkpoint-ha.yaml 
b/examples/basic-checkpoint-ha.yaml
index e288048..f1404d2 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -23,7 +23,7 @@ metadata:
   name: basic-checkpoint-ha-example
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     state.savepoints.dir: file:///flink-data/savepoints
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index ad9b1ef..9620607 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -23,7 +23,7 @@ metadata:
   name: basic-ingress
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   ingressDomain: flink.k8s.io
   flinkConfiguration:
 #    rest.address: basic-example.flink.k8s.io
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
index c78d030..8ccd225 100644
--- a/examples/basic-session.yaml
+++ b/examples/basic-session.yaml
@@ -23,7 +23,7 @@ metadata:
   name: basic-session-example
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 8844a35..799de85 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -23,7 +23,7 @@ metadata:
   name: basic-example
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
index 4376a81..0531262 100644
--- a/examples/custom-logging.yaml
+++ b/examples/custom-logging.yaml
@@ -23,7 +23,7 @@ metadata:
   name: custom-logging-example
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index c81dc2b..ef22aed 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -23,7 +23,7 @@ metadata:
   name: pod-template-example
 spec:
   image: flink:1.14.3
-  flinkVersion: 1.14.3
+  flinkVersion: v1_14
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   podTemplate:
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index a206cac..312bb89 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -44,7 +44,7 @@ public class FlinkDeploymentSpec {
     private String serviceAccount;
 
     /** Flink image version. */
-    private String flinkVersion;
+    private FlinkVersion flinkVersion;
 
     /** Ingress domain for the Flink deployment. */
     private String ingressDomain;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
new file mode 100644
index 0000000..1d3502a
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Enumeration for supported Flink versions. */
+@Experimental
+public enum FlinkVersion {
+    v1_14,
+    v1_15,
+    v1_16;
+
+    public boolean isNewerVersionThan(FlinkVersion otherVersion) {
+        return this.ordinal() > otherVersion.ordinal();
+    }
+
+    /** Returns all versions within the defined range, inclusive both start 
and end. */
+    public static Set<FlinkVersion> rangeOf(FlinkVersion start, FlinkVersion 
end) {
+        return Stream.of(FlinkVersion.values())
+                .filter(v -> v.ordinal() >= start.ordinal() && v.ordinal() <= 
end.ordinal())
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    /** Returns the current version. */
+    public static FlinkVersion current() {
+        return values()[values().length - 1];
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 4db37c6..b917997 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -50,6 +51,7 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
     public Optional<String> validate(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
         return firstPresent(
+                validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkConfig(spec.getFlinkConfiguration()),
                 validateLogConfig(spec.getLogConfiguration()),
                 validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
@@ -67,6 +69,13 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         return Optional.empty();
     }
 
+    private Optional<String> validateFlinkVersion(FlinkVersion version) {
+        if (version == null) {
+            return Optional.of("Flink Version must be defined.");
+        }
+        return Optional.empty();
+    }
+
     private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
         if (confMap == null) {
             return Optional.empty();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index 2be5ea6..fa0e13d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
@@ -106,7 +107,7 @@ public class FlinkOperatorITCase {
                         .build());
         FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
         spec.setImage(IMAGE);
-        spec.setFlinkVersion(FLINK_VERSION);
+        spec.setFlinkVersion(FlinkVersion.v1_14);
         spec.setServiceAccount(SERVICE_ACCOUNT);
         Resource resource = new Resource();
         resource.setMemory("2048m");
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index a1fbb97..a313360 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -95,7 +96,7 @@ public class TestUtils {
                 .image(IMAGE)
                 .imagePullPolicy(IMAGE_POLICY)
                 .serviceAccount(SERVICE_ACCOUNT)
-                .flinkVersion(FLINK_VERSION)
+                .flinkVersion(FlinkVersion.v1_14)
                 .flinkConfiguration(conf)
                 .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, 
null))
                 .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 
null))
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 31d7d25..058edd5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.validation;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -192,6 +193,10 @@ public class DeploymentValidatorTest {
                     
dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
                 },
                 "Cannot switch from session to job cluster");
+
+        testError(dep -> dep.getSpec().setFlinkVersion(null), "Flink Version 
must be defined.");
+
+        testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
     }
 
     private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 88a0ce8..2077f93 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -26,6 +26,10 @@ spec:
               serviceAccount:
                 type: string
               flinkVersion:
+                enum:
+                - v1_14
+                - v1_15
+                - v1_16
                 type: string
               ingressDomain:
                 type: string
@@ -9116,6 +9120,10 @@ spec:
                       serviceAccount:
                         type: string
                       flinkVersion:
+                        enum:
+                        - v1_14
+                        - v1_15
+                        - v1_16
                         type: string
                       ingressDomain:
                         type: string

Reply via email to