This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e1c3192f [FLINK-33089] Drop support for Flink 1.13 and 1.14 and clean 
up related codepaths
e1c3192f is described below

commit e1c3192fdda1b22373a0e5821856c028bd4183b8
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Sat Oct 28 21:57:58 2023 +0200

    [FLINK-33089] Drop support for Flink 1.13 and 1.14 and clean up related 
codepaths
---
 .github/workflows/ci.yml                           | 16 +------
 docs/content/docs/concepts/overview.md             |  2 +-
 docs/content/docs/custom-resource/reference.md     |  1 +
 examples/autoscaling/Dockerfile                    |  2 +-
 examples/basic-checkpoint-ha.yaml                  |  2 +-
 flink-kubernetes-operator-api/pom.xml              |  4 +-
 .../kubernetes/operator/api/spec/FlinkVersion.java |  7 ++-
 .../operator/config/FlinkConfigBuilder.java        |  4 +-
 .../controller/FlinkDeploymentController.java      |  9 +++-
 .../controller/FlinkSessionJobController.java      |  6 +++
 .../sessionjob/SessionJobReconciler.java           |  8 ++++
 .../operator/service/AbstractFlinkService.java     | 43 ++++++------------
 .../operator/service/NativeFlinkService.java       |  6 +--
 .../kubernetes/operator/utils/EventRecorder.java   |  3 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  6 ---
 .../kubernetes/operator/utils/SnapshotUtils.java   | 12 -----
 .../kubernetes/operator/utils/ValidatorUtils.java  | 18 ++++++++
 .../flink/kubernetes/operator/TestUtils.java       | 15 +++----
 .../kubernetes/operator/TestingFlinkService.java   | 24 +++++-----
 .../operator/config/FlinkConfigBuilderTest.java    |  6 +--
 .../controller/FlinkDeploymentControllerTest.java  | 21 +++++++--
 .../controller/FlinkSessionJobControllerTest.java  | 51 +++++++++++++++++++---
 .../deployment/ApplicationObserverTest.java        | 38 ----------------
 .../operator/service/NativeFlinkServiceTest.java   |  4 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  1 +
 25 files changed, 152 insertions(+), 157 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 15bdcad4..70cd7f3a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -70,7 +70,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        version: ["v1_18","v1_17","v1_16","v1_15","v1_14","v1_13"]
+        version: ["v1_18","v1_17","v1_16","v1_15"]
         namespace: ["default","flink"]
         mode: ["native", "standalone"]
         java-version: [ 11, 17 ]
@@ -86,17 +86,13 @@ jobs:
           - namespace: flink
             extraArgs: '--create-namespace --set 
"watchNamespaces={default,flink}"'
           - version: v1_18
-            image: 
ghcr.io\/apache\/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian
+            image: flink:1.18
           - version: v1_17
             image: flink:1.17
           - version: v1_16
             image: flink:1.16
           - version: v1_15
             image: flink:1.15
-          - version: v1_14
-            image: flink:1.14
-          - version: v1_13
-            image: flink:1.13
         exclude:
           - namespace: default
             test: test_multi_sessionjob.sh
@@ -108,14 +104,6 @@ jobs:
             test: test_autoscaler.sh
           - mode: standalone
             test: test_dynamic_config.sh
-          - version: v1_13
-            test: test_autoscaler.sh
-          - version: v1_13
-            test: test_dynamic_config.sh
-          - version: v1_14
-            test: test_autoscaler.sh
-          - version: v1_14
-            test: test_dynamic_config.sh
           - version: v1_15
             test: test_autoscaler.sh
           - version: v1_15
diff --git a/docs/content/docs/concepts/overview.md 
b/docs/content/docs/concepts/overview.md
index d37d8d3d..baa787f4 100644
--- a/docs/content/docs/concepts/overview.md
+++ b/docs/content/docs/concepts/overview.md
@@ -36,7 +36,7 @@ Flink Kubernetes Operator aims to capture the 
responsibilities of a human operat
   - Stateful and stateless application upgrades
   - Triggering and managing savepoints
   - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.13, v1.14, v1.15, v1.16, v1.17
+- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
   - Application cluster
   - Session cluster
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 40037e53..4fc1ea25 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -84,6 +84,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | v1_16 |  |
 | v1_17 |  |
 | v1_18 |  |
+| v1_19 |  |
 
 ### IngressSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec
diff --git a/examples/autoscaling/Dockerfile b/examples/autoscaling/Dockerfile
index e097ab37..30bdd47a 100644
--- a/examples/autoscaling/Dockerfile
+++ b/examples/autoscaling/Dockerfile
@@ -16,5 +16,5 @@
 # limitations under the License.
 
################################################################################
 
-FROM ghcr.io/apache/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian
+FROM flink:1.18
 COPY ./target/autoscaling*.jar /opt/flink/usrlib/autoscaling.jar
diff --git a/examples/basic-checkpoint-ha.yaml 
b/examples/basic-checkpoint-ha.yaml
index a6d035d4..48cd6c70 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -27,7 +27,7 @@ spec:
     taskmanager.numberOfTaskSlots: "2"
     state.savepoints.dir: file:///flink-data/savepoints
     state.checkpoints.dir: file:///flink-data/checkpoints
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.type: kubernetes
     high-availability.storageDir: file:///flink-data/ha
   serviceAccount: flink
   jobManager:
diff --git a/flink-kubernetes-operator-api/pom.xml 
b/flink-kubernetes-operator-api/pom.xml
index 8a8bcbd4..12a0c1d3 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -225,7 +225,7 @@ under the License.
                                       fork="true" failonerror="true">
                                     <classpath 
refid="maven.compile.classpath"/>
                                     <arg 
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
-                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.4.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
+                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
                                 </java>
                             </target>
                         </configuration>
@@ -242,7 +242,7 @@ under the License.
                                       fork="true" failonerror="true">
                                     <classpath 
refid="maven.compile.classpath"/>
                                     <arg 
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
-                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.4.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
+                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
                                 </java>
                             </target>
                         </configuration>
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index fd49304f..f3532b3c 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -28,7 +28,8 @@ public enum FlinkVersion {
     v1_15,
     v1_16,
     v1_17,
-    v1_18;
+    v1_18,
+    v1_19;
 
     public boolean isNewerVersionThan(FlinkVersion otherVersion) {
         return this.ordinal() > otherVersion.ordinal();
@@ -42,4 +43,8 @@ public enum FlinkVersion {
     public static FlinkVersion current() {
         return values()[values().length - 1];
     }
+
+    public static boolean isSupported(FlinkVersion version) {
+        return version != null && 
version.isNewerVersionThan(FlinkVersion.v1_14);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 82ce46d0..f99c08fc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -162,9 +162,7 @@ public class FlinkConfigBuilder {
             }
 
             // We need to keep the application clusters around for proper 
operator behaviour
-            if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) 
{
-                effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
-            }
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
             if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
                 setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
             }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index e4fe0a13..c8c24ef3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -53,7 +54,7 @@ import java.util.Optional;
 import java.util.Set;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
-@ControllerConfiguration()
+@ControllerConfiguration
 public class FlinkDeploymentController
         implements Reconciler<FlinkDeployment>,
                 ErrorStatusHandler<FlinkDeployment>,
@@ -127,6 +128,12 @@ public class FlinkDeploymentController
         statusRecorder.updateStatusFromCache(flinkApp);
         FlinkDeployment previousDeployment = 
ReconciliationUtils.clone(flinkApp);
         var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);
+
+        // If we get an unsupported Flink version, trigger event and exit
+        if (!ValidatorUtils.validateSupportedVersion(ctx, eventRecorder)) {
+            return UpdateControl.noUpdate();
+        }
+
         try {
             observerFactory.getOrCreate(flinkApp).observe(ctx);
             if (!validateDeployment(ctx)) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 34972915..3ef87cb7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -98,6 +99,11 @@ public class FlinkSessionJobController
         FlinkSessionJob previousJob = 
ReconciliationUtils.clone(flinkSessionJob);
         var ctx = ctxFactory.getResourceContext(flinkSessionJob, josdkContext);
 
+        // If we get an unsupported Flink version, trigger event and exit
+        if (!ValidatorUtils.validateSupportedVersion(ctx, eventRecorder)) {
+            return UpdateControl.noUpdate();
+        }
+
         observer.observe(ctx);
         if (!validateSessionJob(ctx)) {
             statusRecorder.patchAndCacheStatus(flinkSessionJob, 
ctx.getKubernetesClient());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 16edab6b..c7cac9df 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -67,6 +67,14 @@ public class SessionJobReconciler
             Optional<String> savepoint,
             boolean requireHaMetadata)
             throws Exception {
+
+        eventRecorder.triggerEvent(
+                ctx.getResource(),
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.Submit,
+                EventRecorder.Component.Job,
+                MSG_SUBMIT,
+                ctx.getKubernetesClient());
         var jobID =
                 ctx.getFlinkService()
                         .submitJobToSessionCluster(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 9ee49f00..399e692d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
@@ -275,7 +274,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : 
null;
 
         Optional<String> savepointOpt = Optional.empty();
-        var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf);
+        var savepointFormatType =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
         try (var clusterClient = getClusterClient(conf)) {
             var clusterId = clusterClient.getClusterId();
             switch (upgradeMode) {
@@ -313,11 +313,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                                             
KubernetesOperatorConfigOptions
                                                                     
.DRAIN_ON_SAVEPOINT_DELETION),
                                                     savepointDirectory,
-                                                    conf.get(FLINK_VERSION)
-                                                                    
.isNewerVersionThan(
-                                                                            
FlinkVersion.v1_14)
-                                                            ? 
savepointFormatType
-                                                            : null)
+                                                    savepointFormatType)
                                             .get(timeout, TimeUnit.SECONDS);
                             savepointOpt = Optional.of(savepoint);
                             LOG.info("Job successfully suspended with 
savepoint {}.", savepoint);
@@ -365,11 +361,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     
deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
                 });
 
-        var shutdownDisabled =
-                upgradeMode != UpgradeMode.LAST_STATE
-                        && FlinkUtils.clusterShutdownDisabled(
-                                
ReconciliationUtils.getDeployedSpec(deployment));
-        if (!shutdownDisabled) {
+        // Unless we leave the jm around after savepoint, we should wait until 
it has finished
+        // shutting down
+        if (deleteClusterAfterSavepoint || upgradeMode != 
UpgradeMode.SAVEPOINT) {
             waitForClusterShutdown(conf);
             
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         }
@@ -422,13 +416,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                                                 
KubernetesOperatorConfigOptions
                                                                         
.DRAIN_ON_SAVEPOINT_DELETION),
                                                         savepointDirectory,
-                                                        conf.get(FLINK_VERSION)
-                                                                        
.isNewerVersionThan(
-                                                                               
 FlinkVersion.v1_14)
-                                                                ? conf.get(
-                                                                        
KubernetesOperatorConfigOptions
-                                                                               
 .OPERATOR_SAVEPOINT_FORMAT_TYPE)
-                                                                : null)
+                                                        conf.get(
+                                                                
KubernetesOperatorConfigOptions
+                                                                        
.OPERATOR_SAVEPOINT_FORMAT_TYPE))
                                                 .get(timeout, 
TimeUnit.SECONDS);
                                 savepointOpt = Optional.of(savepoint);
                                 LOG.info(
@@ -484,7 +474,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     
Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
             var timeout = operatorConfig.getFlinkClientTimeout().getSeconds();
 
-            var savepointFormatType = 
SnapshotUtils.getSavepointFormatType(conf);
+            var savepointFormatType =
+                    
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
 
             var response =
                     clusterClient
@@ -492,13 +483,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                     savepointTriggerHeaders,
                                     savepointTriggerMessageParameters,
                                     new SavepointTriggerRequestBody(
-                                            savepointDirectory,
-                                            false,
-                                            conf.get(FLINK_VERSION)
-                                                            
.isNewerVersionThan(FlinkVersion.v1_14)
-                                                    ? savepointFormatType
-                                                    : null,
-                                            null))
+                                            savepointDirectory, false, 
savepointFormatType, null))
                             .get(timeout, TimeUnit.SECONDS);
             LOG.info("Savepoint successfully triggered: " + 
response.getTriggerId().toHexString());
 
@@ -781,9 +766,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                             jobID,
                             job.getAllowNonRestoredState(),
                             savepoint,
-                            
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
-                                    ? RestoreMode.DEFAULT
-                                    : null,
+                            RestoreMode.DEFAULT,
                             
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
                                     ? conf.toMap()
                                     : null);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 0f55e948..27d5a265 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -114,11 +114,7 @@ public class NativeFlinkService extends 
AbstractFlinkService {
     public void cancelJob(
             FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration 
configuration)
             throws Exception {
-        // prior to Flink 1.15, ensure removal of orphaned config maps
-        // https://issues.apache.org/jira/browse/FLINK-30004
-        boolean deleteClusterAfterSavepoint =
-                
!deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
-        cancelJob(deployment, upgradeMode, configuration, 
deleteClusterAfterSavepoint);
+        cancelJob(deployment, upgradeMode, configuration, false);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index e99c8ef9..a6e08d75 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -249,6 +249,7 @@ public class EventRecorder {
         ScalingReport,
         IneffectiveScaling,
         AutoscalerError,
-        Scaling
+        Scaling,
+        UnsupportedFlinkVersion
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 02a587b9..d8643e2b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -340,10 +338,6 @@ public class FlinkUtils {
                 || haMode.equalsIgnoreCase("kubernetes");
     }
 
-    public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
-        return spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
-    }
-
     public static int getNumTaskManagers(Configuration conf) {
         int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM);
         return getNumTaskManagers(conf, parallelism);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index fdd537d6..1f250b52 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
@@ -403,15 +402,4 @@ public class SnapshotUtils {
             }
         }
     }
-
-    public static SavepointFormatType getSavepointFormatType(Configuration 
configuration) {
-        var savepointFormatType = 
org.apache.flink.core.execution.SavepointFormatType.CANONICAL;
-        if (configuration.get(FLINK_VERSION) != null
-                && 
configuration.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)) {
-            savepointFormatType =
-                    configuration.get(
-                            
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
-        }
-        return savepointFormatType;
-    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
index 96fb29f3..4aca7bfa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
@@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
@@ -56,4 +58,20 @@ public final class ValidatorUtils {
                         });
         return resourceValidators;
     }
+
+    public static boolean validateSupportedVersion(
+            FlinkResourceContext<?> ctx, EventRecorder eventRecorder) {
+        var version = ctx.getFlinkVersion();
+        if (!FlinkVersion.isSupported(version)) {
+            eventRecorder.triggerEvent(
+                    ctx.getResource(),
+                    EventRecorder.Type.Warning,
+                    EventRecorder.Reason.UnsupportedFlinkVersion,
+                    EventRecorder.Component.Operator,
+                    "Flink version " + version + " is not supported by this 
operator version",
+                    ctx.getJosdkContext().getClient());
+            return false;
+        }
+        return true;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 42672b77..2b2e23a7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -191,16 +191,16 @@ public class TestUtils extends BaseTestUtils {
     }
 
     public static <T extends HasMetadata> Context<T> 
createContextWithReadyFlinkDeployment(
-            Map<String, String> flinkDepConfig) {
-        return createContextWithReadyFlinkDeployment(flinkDepConfig, null);
+            Map<String, String> flinkDepConfig, KubernetesClient client) {
+        return createContextWithReadyFlinkDeployment(flinkDepConfig, client, 
FlinkVersion.v1_18);
     }
 
     public static <T extends HasMetadata> Context<T> 
createContextWithReadyFlinkDeployment(
-            Map<String, String> flinkDepConfig, KubernetesClient client) {
+            Map<String, String> flinkDepConfig, KubernetesClient client, 
FlinkVersion version) {
         return new TestingContext<>() {
             @Override
             public Optional<T> getSecondaryResource(Class expectedType, String 
eventSourceName) {
-                var session = buildSessionCluster();
+                var session = buildSessionCluster(version);
                 
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
                 
session.getSpec().getFlinkConfiguration().putAll(flinkDepConfig);
                 session.getStatus()
@@ -324,7 +324,7 @@ public class TestUtils extends BaseTestUtils {
 
     public static Stream<Arguments> flinkVersionsAndUpgradeModes() {
         List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : Set.of(FlinkVersion.v1_14, 
FlinkVersion.v1_15)) {
+        for (FlinkVersion version : Set.of(FlinkVersion.v1_15, 
FlinkVersion.v1_18)) {
             for (UpgradeMode upgradeMode : UpgradeMode.values()) {
                 args.add(arguments(version, upgradeMode));
             }
@@ -333,10 +333,7 @@ public class TestUtils extends BaseTestUtils {
     }
 
     public static Stream<Arguments> flinkVersions() {
-        return Stream.of(
-                arguments(FlinkVersion.v1_14),
-                arguments(FlinkVersion.v1_15),
-                arguments(FlinkVersion.v1_17));
+        return Stream.of(arguments(FlinkVersion.v1_15), 
arguments(FlinkVersion.v1_18));
     }
 
     public static FlinkDeployment createCanaryDeployment() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 4dc8e75d..94453b06 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -53,7 +53,6 @@ import 
org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest;
 import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -298,7 +297,8 @@ public class TestingFlinkService extends 
AbstractFlinkService {
             Configuration conf) {
         var triggerId = "savepoint_trigger_" + savepointTriggerCounter++;
 
-        var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf);
+        var savepointFormatType =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
         savepointInfo.setTrigger(
                 triggerId, triggerType, 
SavepointFormatType.valueOf(savepointFormatType.name()));
         savepointTriggers.put(triggerId, false);
@@ -471,18 +471,14 @@ public class TestingFlinkService extends 
AbstractFlinkService {
 
         var sp = savepoint ? "savepoint_" + savepointCounter++ : null;
 
-        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
-            JobStatusMessage oldStatus = jobOpt.get().f1;
-            jobOpt.get().f1 =
-                    new JobStatusMessage(
-                            oldStatus.getJobId(),
-                            oldStatus.getJobName(),
-                            JobStatus.FINISHED,
-                            oldStatus.getStartTime());
-            jobOpt.get().f0 = sp;
-        } else {
-            jobs.removeIf(js -> js.f1.getJobId().equals(jobID));
-        }
+        JobStatusMessage oldStatus = jobOpt.get().f1;
+        jobOpt.get().f1 =
+                new JobStatusMessage(
+                        oldStatus.getJobId(),
+                        oldStatus.getJobName(),
+                        JobStatus.FINISHED,
+                        oldStatus.getStartTime());
+        jobOpt.get().f0 = sp;
 
         return sp;
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 14bbaf9e..00b418eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -177,11 +177,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
-            
Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
-        } else {
-            
Assertions.assertTrue(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
-        }
+        
Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index e9a13160..61dbf692 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -448,9 +449,7 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
         testController.reconcile(appCluster, context);
         assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)
-                        ? JobManagerDeploymentStatus.READY
-                        : JobManagerDeploymentStatus.MISSING,
+                JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
 
         // Resume from last savepoint
@@ -1226,6 +1225,22 @@ public class FlinkDeploymentControllerTest {
                 
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
     }
 
+    @ParameterizedTest
+    @EnumSource(FlinkVersion.class)
+    public void testUnsupportedVersions(FlinkVersion version) throws Exception 
{
+        var appCluster = TestUtils.buildApplicationCluster(version);
+        var updateControl = testController.reconcile(appCluster, context);
+        var lastEvent = testController.events().poll();
+        if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+            assertTrue(updateControl.getScheduleDelay().isEmpty());
+            assertEquals(
+                    EventRecorder.Reason.UnsupportedFlinkVersion.name(), 
lastEvent.getReason());
+        } else {
+            assertTrue(updateControl.getScheduleDelay().isPresent());
+            assertEquals(EventRecorder.Reason.Submit.name(), 
lastEvent.getReason());
+        }
+    }
+
     private HasMetadata getIngress(FlinkDeployment deployment) {
         if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) {
             return kubernetesClient
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index c7eed4c9..0705f70f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus;
@@ -39,8 +40,11 @@ import 
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -85,7 +89,10 @@ class FlinkSessionJobControllerTest {
             // Ignore
         }
 
-        Assertions.assertEquals(1, testController.events().size());
+        Assertions.assertEquals(2, testController.events().size());
+        // Discard submit event
+        testController.events().remove();
+
         var event = testController.events().remove();
         Assertions.assertEquals(EventRecorder.Type.Warning.toString(), 
event.getType());
         Assertions.assertEquals("SessionJobException", event.getReason());
@@ -271,7 +278,10 @@ class FlinkSessionJobControllerTest {
 
         testController.reconcile(sessionJob, context);
 
-        assertEquals(1, testController.events().size());
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         assertEquals(
                 EventRecorder.Reason.JobStatusChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
@@ -312,7 +322,10 @@ class FlinkSessionJobControllerTest {
         assertEquals(1, jobs.size());
         assertNull(jobs.get(0).f0);
 
-        assertEquals(1, testController.events().size());
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         assertEquals(
                 EventRecorder.Reason.JobStatusChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
@@ -334,10 +347,13 @@ class FlinkSessionJobControllerTest {
         testController.reconcile(sessionJob, context);
         flinkService.clearJobsInTerminalState();
         testController.reconcile(sessionJob, context);
-        assertEquals(2, testController.events().size());
+        assertEquals(3, testController.events().size());
         assertEquals(
                 EventRecorder.Reason.SpecChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Submit,
+                
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
         assertEquals(
                 EventRecorder.Reason.JobStatusChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
@@ -368,16 +384,19 @@ class FlinkSessionJobControllerTest {
         sessionJob.getSpec().getJob().setParallelism(-1);
         testController.reconcile(sessionJob, context);
         flinkService.clearJobsInTerminalState();
-        assertEquals(1, testController.events().size());
+        assertEquals(2, testController.events().size());
         testController.reconcile(sessionJob, context);
         var statusEvents =
                 testController.events().stream()
                         .filter(e -> 
!e.getReason().equals(ValidationError.name()))
                         .collect(Collectors.toList());
-        assertEquals(1, statusEvents.size());
+        assertEquals(2, statusEvents.size());
         assertEquals(
-                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.Submit,
                 EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.valueOf(statusEvents.get(1).getReason()));
 
         assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(
@@ -508,6 +527,24 @@ class FlinkSessionJobControllerTest {
         assertEquals(0, 
testController.getCanaryResourceManager().getNumberOfActiveCanaries());
     }
 
+    @ParameterizedTest
+    @EnumSource(FlinkVersion.class)
+    public void testUnsupportedVersions(FlinkVersion version) throws Exception 
{
+        context =
+                TestUtils.createContextWithReadyFlinkDeployment(
+                        Map.of(), kubernetesClient, version);
+        var updateControl = 
testController.reconcile(TestUtils.buildSessionJob(), context);
+        var lastEvent = testController.events().poll();
+        if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+            assertTrue(updateControl.getScheduleDelay().isEmpty());
+            assertEquals(
+                    EventRecorder.Reason.UnsupportedFlinkVersion.name(), 
lastEvent.getReason());
+        } else {
+            assertTrue(updateControl.getScheduleDelay().isPresent());
+            assertEquals(EventRecorder.Reason.Submit.name(), 
lastEvent.getReason());
+        }
+    }
+
     private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob 
sessionJob)
             throws Exception {
         UpdateControl<FlinkDeployment> updateControl =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 68c0c227..4e57245d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
@@ -700,43 +699,6 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
                         .getSavepointInfo()
                         .getLastSavepoint()
                         .getFormatType());
-
-        // canonical for flink savepoint
-        Long thirdNonce = 789L;
-        deployment.getSpec().getJob().setSavepointTriggerNonce(thirdNonce);
-        deployment.getSpec().setFlinkVersion(FlinkVersion.v1_14);
-        deployment
-                .getSpec()
-                .setFlinkConfiguration(
-                        Map.of(
-                                OPERATOR_SAVEPOINT_FORMAT_TYPE.key(),
-                                
org.apache.flink.core.execution.SavepointFormatType.NATIVE.name()));
-        conf = configManager.getDeployConfig(deployment.getMetadata(), 
deployment.getSpec());
-        flinkService.triggerSavepoint(
-                deployment.getStatus().getJobStatus().getJobId(),
-                SnapshotTriggerType.MANUAL,
-                deployment.getStatus().getJobStatus().getSavepointInfo(),
-                conf);
-
-        observer.observe(deployment, readyContext);
-        observer.observe(deployment, readyContext);
-        
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
-        assertEquals(
-                thirdNonce,
-                deployment
-                        .getStatus()
-                        .getJobStatus()
-                        .getSavepointInfo()
-                        .getLastSavepoint()
-                        .getTriggerNonce());
-        assertEquals(
-                SavepointFormatType.CANONICAL,
-                deployment
-                        .getStatus()
-                        .getJobStatus()
-                        .getSavepointInfo()
-                        .getLastSavepoint()
-                        .getFormatType());
     }
 
     private void bringToReadyStatus(FlinkDeployment deployment) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index fb823500..27bc846a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -152,9 +152,7 @@ public class NativeFlinkServiceTest {
                             UpgradeMode upgradeMode,
                             Configuration conf,
                             boolean deleteClusterAfterSavepoint) {
-                        assertEquals(
-                                
flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) ? false : true,
-                                deleteClusterAfterSavepoint);
+                        assertEquals(false, deleteClusterAfterSavepoint);
                         tested.set(true);
                     }
                 };
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 685e87ef..d16ba415 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -45,6 +45,7 @@ spec:
                 - v1_16
                 - v1_17
                 - v1_18
+                - v1_19
                 type: string
               ingress:
                 properties:

Reply via email to