This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 02d8dd5d [FLINK-33089] Clean up 1.13 and 1.14 references from docs and 
code (#768)
02d8dd5d is described below

commit 02d8dd5d915b7d890d09c23c9003580ab63e6dc1
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 1 16:35:15 2024 +0100

    [FLINK-33089] Clean up 1.13 and 1.14 references from docs and code (#768)
---
 docs/content/docs/custom-resource/overview.md                  |  2 +-
 docs/content/docs/custom-resource/reference.md                 |  4 ++--
 .../flink/kubernetes/operator/api/spec/FlinkVersion.java       | 10 +++++++---
 .../flink/kubernetes/operator/config/FlinkConfigBuilder.java   |  4 ++--
 .../kubernetes/operator/service/AbstractFlinkService.java      |  2 +-
 .../flink/kubernetes/operator/service/NativeFlinkService.java  |  2 +-
 .../apache/flink/kubernetes/operator/utils/SnapshotUtils.java  |  2 +-
 .../operator/controller/FlinkDeploymentControllerTest.java     |  2 +-
 .../operator/controller/FlinkSessionJobControllerTest.java     |  2 +-
 .../reconciler/deployment/ApplicationReconcilerTest.java       |  2 +-
 .../kubernetes/operator/service/AbstractFlinkServiceTest.java  |  2 +-
 11 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/docs/content/docs/custom-resource/overview.md 
b/docs/content/docs/custom-resource/overview.md
index 01b098ad..a4362e98 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -85,7 +85,7 @@ The spec contains all the information the operator need to 
deploy and manage you
 
 Most deployments will define at least the following fields:
  - `image` : Docker used to run Flink job and task manager processes
- - `flinkVersion` : Flink version used in the image (`v1_13`, `v1_14`, 
`v1_15`, `v1_16` ...)
+ - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16` ...)
  - `serviceAccount` : Kubernetes service account used by the Flink pods
  - `taskManager, jobManager` : Job and Task manager pod resource specs (cpu, 
memory, ephemeralStorage)
  - `flinkConfiguration` : Map of Flink configuration overrides such as HA and 
checkpointing configs
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 030e043a..c7810eb2 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -78,8 +78,8 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 
 | Value | Docs |
 | ----- | ---- |
-| v1_13 |  |
-| v1_14 |  |
+| v1_13 | No longer supported since 1.7 operator release. |
+| v1_14 | No longer supported since 1.7 operator release. |
 | v1_15 |  |
 | v1_16 |  |
 | v1_17 |  |
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index f3532b3c..8d4b6b7d 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -23,7 +23,11 @@ import org.apache.flink.annotation.Experimental;
 /** Enumeration for supported Flink versions. */
 @Experimental
 public enum FlinkVersion {
+    /** No longer supported since 1.7 operator release. */
+    @Deprecated
     v1_13,
+    /** No longer supported since 1.7 operator release. */
+    @Deprecated
     v1_14,
     v1_15,
     v1_16,
@@ -31,8 +35,8 @@ public enum FlinkVersion {
     v1_18,
     v1_19;
 
-    public boolean isNewerVersionThan(FlinkVersion otherVersion) {
-        return this.ordinal() > otherVersion.ordinal();
+    public boolean isEqualOrNewer(FlinkVersion otherVersion) {
+        return this.ordinal() >= otherVersion.ordinal();
     }
 
     /**
@@ -45,6 +49,6 @@ public enum FlinkVersion {
     }
 
     public static boolean isSupported(FlinkVersion version) {
-        return version != null && 
version.isNewerVersionThan(FlinkVersion.v1_14);
+        return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 39d0fed6..6caa46a3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -118,7 +118,7 @@ public class FlinkConfigBuilder {
     protected FlinkConfigBuilder applyImage() {
         if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
             String configKey;
-            if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16)) 
{
+            if (spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17)) {
                 configKey = KubernetesConfigOptions.CONTAINER_IMAGE.key();
             } else {
                 configKey = "kubernetes.container.image";
@@ -469,7 +469,7 @@ public class FlinkConfigBuilder {
             return;
         }
 
-        boolean newConfKeys = 
spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16);
+        boolean newConfKeys = 
spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17);
         String configKey;
         if (isJM) {
             if (newConfKeys) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 611ab152..2fafb027 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -807,7 +807,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                             job.getAllowNonRestoredState(),
                             savepoint,
                             RestoreMode.DEFAULT,
-                            
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
+                            
conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
                                     ? conf.toMap()
                                     : null);
             LOG.info("Submitting job: {} to session cluster.", jobID);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 41083d91..52bb398e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -250,7 +250,7 @@ public class NativeFlinkService extends 
AbstractFlinkService {
             return false;
         }
 
-        if 
(!observeConfig.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_17)) {
+        if 
(!observeConfig.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_18)) {
             LOG.debug("In-place rescaling is only available starting from 
Flink 1.18");
             return false;
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 1f250b52..2322e484 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -353,7 +353,7 @@ public class SnapshotUtils {
     public static boolean isSnapshotTriggeringSupported(Configuration conf) {
         // Flink REST API supports triggering checkpoints externally starting 
with 1.17
         return conf.get(FLINK_VERSION) != null
-                && 
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16);
+                && conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17);
     }
 
     public static boolean gracePeriodEnded(Duration gracePeriod, SnapshotInfo 
snapshotInfo) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 61dbf692..904944da 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -1231,7 +1231,7 @@ public class FlinkDeploymentControllerTest {
         var appCluster = TestUtils.buildApplicationCluster(version);
         var updateControl = testController.reconcile(appCluster, context);
         var lastEvent = testController.events().poll();
-        if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+        if (!version.isEqualOrNewer(FlinkVersion.v1_15)) {
             assertTrue(updateControl.getScheduleDelay().isEmpty());
             assertEquals(
                     EventRecorder.Reason.UnsupportedFlinkVersion.name(), 
lastEvent.getReason());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 5c620256..d532ff25 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -535,7 +535,7 @@ class FlinkSessionJobControllerTest {
                         Map.of(), kubernetesClient, version);
         var updateControl = 
testController.reconcile(TestUtils.buildSessionJob(), context);
         var lastEvent = testController.events().poll();
-        if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+        if (!version.isEqualOrNewer(FlinkVersion.v1_15)) {
             assertTrue(updateControl.getScheduleDelay().isEmpty());
             assertEquals(
                     EventRecorder.Reason.UnsupportedFlinkVersion.name(), 
lastEvent.getReason());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ce83bce5..c414245d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -946,7 +946,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         }
         statusRecorder.updateStatusFromCache(deployment);
 
-        if (!flinkVersion.isNewerVersionThan(FlinkVersion.v1_15)) {
+        if (!flinkVersion.isEqualOrNewer(FlinkVersion.v1_16)) {
             
assertFalse(StringUtils.isBlank(getJobStatus(deployment).getJobId()));
         }
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 3739298f..637e3bed 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -202,7 +202,7 @@ public class AbstractFlinkServiceTest {
                 job.getMetadata(), job.getSpec(), JobID.generate(), 
deployConf, null);
 
         // Make sure that deploy conf was passed to jar run
-        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
+        if (flinkVersion.isEqualOrNewer(FlinkVersion.v1_17)) {
             assertEquals(deployConf.toMap(), 
jarRuns.get(0).getFlinkConfiguration().toMap());
         } else {
             
assertTrue(jarRuns.get(0).getFlinkConfiguration().toMap().isEmpty());

Reply via email to