This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 74c69a4 [FLINK-26640] Make FlinkVersion enum and required
74c69a4 is described below
commit 74c69a4f7ac4620d126e8fa6b3489a1c6223f4a2
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Mar 17 11:03:35 2022 +0100
[FLINK-26640] Make FlinkVersion enum and required
---
docs/content/docs/custom-resource/overview.md | 1 +
docs/content/docs/custom-resource/reference.md | 13 +++++-
e2e-tests/data/cr.yaml | 2 +-
examples/basic-checkpoint-ha.yaml | 2 +-
examples/basic-ingress.yaml | 2 +-
examples/basic-session.yaml | 2 +-
examples/basic.yaml | 2 +-
examples/custom-logging.yaml | 2 +-
examples/pod-template.yaml | 2 +-
.../operator/crd/spec/FlinkDeploymentSpec.java | 2 +-
.../kubernetes/operator/crd/spec/FlinkVersion.java | 50 ++++++++++++++++++++++
.../validation/DefaultDeploymentValidator.java | 9 ++++
.../kubernetes/operator/FlinkOperatorITCase.java | 3 +-
.../flink/kubernetes/operator/TestUtils.java | 3 +-
.../validation/DeploymentValidatorTest.java | 5 +++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 8 ++++
16 files changed, 97 insertions(+), 11 deletions(-)
diff --git a/docs/content/docs/custom-resource/overview.md
b/docs/content/docs/custom-resource/overview.md
index 2cf25b3..6046a4c 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -82,6 +82,7 @@ The spec contains all the information the operator need to
deploy and manage you
Most deployments will define at least the following fields:
- `image` : Docker used to run Flink job and task manager processes
+ - `flinkVersion` : Flink version used in the image (`v1_14`, `v1_15`...)
- `serviceAccount` : Kubernetes service account used by the Flink pods
- `taskManager, jobManager` : Job and Task manager pod resource specs (cpu,
memory, etc.)
- `flinkConfiguration` : Map of Flink configuration overrides such as HA and
checkpointing configs
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 82ccd36..a475fe1 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -48,7 +48,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| image | java.lang.String | Flink docker image used to start the Job and
TaskManager pods. |
| imagePullPolicy | java.lang.String | Image pull policy of the Flink docker
image. |
| serviceAccount | java.lang.String | Kubernetes service used by the Flink
deployment. |
-| flinkVersion | java.lang.String | Flink image version. |
+| flinkVersion | org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion |
Flink image version. |
| ingressDomain | java.lang.String | Ingress domain for the Flink deployment. |
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> |
Flink configuration overrides for the Flink deployment. |
| podTemplate | io.fabric8.kubernetes.api.model.Pod | Base pod template for
job and task manager pods. Can be overridden by the jobManager and taskManager
pod templates. |
@@ -57,6 +57,17 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| job | org.apache.flink.kubernetes.operator.crd.spec.JobSpec | Job
specification for application deployments. Null for session clusters. |
| logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log
configuration overrides for the Flink deployment. Format logConfigFileName ->
configContent. |
+### FlinkVersion
+**Class**: org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion
+
+**Description**: Enumeration for supported Flink versions.
+
+| Value | Docs |
+| ----- | ---- |
+| v1_14 | |
+| v1_15 | |
+| v1_16 | |
+
### JobManagerSpec
**Class**: org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index 3919e48..f697296 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -23,7 +23,7 @@ metadata:
name: flink-example-statemachine
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
diff --git a/examples/basic-checkpoint-ha.yaml
b/examples/basic-checkpoint-ha.yaml
index e288048..f1404d2 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -23,7 +23,7 @@ metadata:
name: basic-checkpoint-ha-example
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index ad9b1ef..9620607 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -23,7 +23,7 @@ metadata:
name: basic-ingress
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
ingressDomain: flink.k8s.io
flinkConfiguration:
# rest.address: basic-example.flink.k8s.io
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
index c78d030..8ccd225 100644
--- a/examples/basic-session.yaml
+++ b/examples/basic-session.yaml
@@ -23,7 +23,7 @@ metadata:
name: basic-session-example
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 8844a35..799de85 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -23,7 +23,7 @@ metadata:
name: basic-example
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
index 4376a81..0531262 100644
--- a/examples/custom-logging.yaml
+++ b/examples/custom-logging.yaml
@@ -23,7 +23,7 @@ metadata:
name: custom-logging-example
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index c81dc2b..ef22aed 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -23,7 +23,7 @@ metadata:
name: pod-template-example
spec:
image: flink:1.14.3
- flinkVersion: 1.14.3
+ flinkVersion: v1_14
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
podTemplate:
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index a206cac..312bb89 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -44,7 +44,7 @@ public class FlinkDeploymentSpec {
private String serviceAccount;
/** Flink image version. */
- private String flinkVersion;
+ private FlinkVersion flinkVersion;
/** Ingress domain for the Flink deployment. */
private String ingressDomain;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
new file mode 100644
index 0000000..1d3502a
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkVersion.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Enumeration for supported Flink versions. */
+@Experimental
+public enum FlinkVersion {
+ v1_14,
+ v1_15,
+ v1_16;
+
+ public boolean isNewerVersionThan(FlinkVersion otherVersion) {
+ return this.ordinal() > otherVersion.ordinal();
+ }
+
+ /** Returns all versions within the defined range, inclusive both start
and end. */
+ public static Set<FlinkVersion> rangeOf(FlinkVersion start, FlinkVersion
end) {
+ return Stream.of(FlinkVersion.values())
+ .filter(v -> v.ordinal() >= start.ordinal() && v.ordinal() <=
end.ordinal())
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ /** Returns the current version. */
+ public static FlinkVersion current() {
+ return values()[values().length - 1];
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 4db37c6..b917997 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -50,6 +51,7 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
public Optional<String> validate(FlinkDeployment deployment) {
FlinkDeploymentSpec spec = deployment.getSpec();
return firstPresent(
+ validateFlinkVersion(spec.getFlinkVersion()),
validateFlinkConfig(spec.getFlinkConfiguration()),
validateLogConfig(spec.getLogConfiguration()),
validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
@@ -67,6 +69,13 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.empty();
}
+ private Optional<String> validateFlinkVersion(FlinkVersion version) {
+ if (version == null) {
+ return Optional.of("Flink Version must be defined.");
+ }
+ return Optional.empty();
+ }
+
private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
if (confMap == null) {
return Optional.empty();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index 2be5ea6..fa0e13d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
@@ -106,7 +107,7 @@ public class FlinkOperatorITCase {
.build());
FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
spec.setImage(IMAGE);
- spec.setFlinkVersion(FLINK_VERSION);
+ spec.setFlinkVersion(FlinkVersion.v1_14);
spec.setServiceAccount(SERVICE_ACCOUNT);
Resource resource = new Resource();
resource.setMemory("2048m");
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index a1fbb97..a313360 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -95,7 +96,7 @@ public class TestUtils {
.image(IMAGE)
.imagePullPolicy(IMAGE_POLICY)
.serviceAccount(SERVICE_ACCOUNT)
- .flinkVersion(FLINK_VERSION)
+ .flinkVersion(FlinkVersion.v1_14)
.flinkConfiguration(conf)
.jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1,
null))
.taskManager(new TaskManagerSpec(new Resource(1, "2048m"),
null))
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 31d7d25..058edd5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -192,6 +193,10 @@ public class DeploymentValidatorTest {
dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
},
"Cannot switch from session to job cluster");
+
+ testError(dep -> dep.getSpec().setFlinkVersion(null), "Flink Version
must be defined.");
+
+ testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
}
private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 88a0ce8..2077f93 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -26,6 +26,10 @@ spec:
serviceAccount:
type: string
flinkVersion:
+ enum:
+ - v1_14
+ - v1_15
+ - v1_16
type: string
ingressDomain:
type: string
@@ -9116,6 +9120,10 @@ spec:
serviceAccount:
type: string
flinkVersion:
+ enum:
+ - v1_14
+ - v1_15
+ - v1_16
type: string
ingressDomain:
type: string