This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f4c0c99c [FLINK-39808] Warn on use of deprecated Flink versions (#1124)
f4c0c99c is described below
commit f4c0c99c6ebc183e5e2bab56e87ac31a450b168a
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 12:21:36 2026 +0300
[FLINK-39808] Warn on use of deprecated Flink versions (#1124)
---
docs/content.zh/docs/custom-resource/reference.md | 8 ++---
docs/content/docs/custom-resource/reference.md | 4 +--
.../kubernetes/operator/api/spec/FlinkVersion.java | 25 ++++++++++++++
.../operator/api/spec/FlinkVersionTest.java | 38 ++++++++++++++++++++++
.../operator/api/utils/BaseTestUtils.java | 8 ++---
.../kubernetes/operator/utils/ValidatorUtils.java | 7 +++-
.../flink/kubernetes/operator/TestUtils.java | 6 ++--
.../operator/validation/DefaultValidatorTest.java | 4 ++-
8 files changed, 85 insertions(+), 15 deletions(-)
diff --git a/docs/content.zh/docs/custom-resource/reference.md
b/docs/content.zh/docs/custom-resource/reference.md
index 126de5a4..a8219c97 100644
--- a/docs/content.zh/docs/custom-resource/reference.md
+++ b/docs/content.zh/docs/custom-resource/reference.md
@@ -80,10 +80,10 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----- | ---- |
| v1_13 | No longer supported since 1.7 operator release. |
| v1_14 | No longer supported since 1.7 operator release. |
-| v1_15 | |
-| v1_16 | |
-| v1_17 | |
-| v1_18 | |
+| v1_15 | Deprecated since 1.10 operator release. |
+| v1_16 | Deprecated since 1.11 operator release. |
+| v1_17 | Deprecated since 1.13 operator release. |
+| v1_18 | Deprecated since 1.13 operator release. |
| v1_19 | |
| v1_20 | |
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 76b303ce..df54244f 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -158,8 +158,8 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| v1_14 | No longer supported since 1.7 operator release. |
| v1_15 | Deprecated since 1.10 operator release. |
| v1_16 | Deprecated since 1.11 operator release. |
-| v1_17 | |
-| v1_18 | |
+| v1_17 | Deprecated since 1.13 operator release. |
+| v1_18 | Deprecated since 1.13 operator release. |
| v1_19 | |
| v1_20 | |
| v2_0 | |
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index 8c1705fa..2d69d7d3 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -20,6 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import java.util.EnumSet;
+import java.util.Set;
+
/** Enumeration for supported Flink versions. */
@Experimental
public enum FlinkVersion {
@@ -35,8 +38,10 @@ public enum FlinkVersion {
/** Deprecated since 1.11 operator release. */
@Deprecated
v1_16(1, 16),
+ /** Deprecated since 1.13 operator release. */
@Deprecated
v1_17(1, 17),
+ /** Deprecated since 1.13 operator release. */
@Deprecated
v1_18(1, 18),
v1_19(1, 19),
@@ -47,6 +52,8 @@ public enum FlinkVersion {
v2_3(2, 3),
v2_4(2, 4);
+ private static final Set<FlinkVersion> DEPRECATED = computeDeprecated();
+
/** The major integer from the Flink semver. For example for Flink 1.18.1
this would be 1. */
private final int majorVersion;
@@ -68,6 +75,24 @@ public enum FlinkVersion {
return false;
}
+ private static Set<FlinkVersion> computeDeprecated() {
+ Set<FlinkVersion> deprecated = EnumSet.noneOf(FlinkVersion.class);
+ for (FlinkVersion v : values()) {
+ try {
+ if
(FlinkVersion.class.getField(v.name()).isAnnotationPresent(Deprecated.class)) {
+ deprecated.add(v);
+ }
+ } catch (NoSuchFieldException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return deprecated;
+ }
+
+ public boolean isDeprecated() {
+ return DEPRECATED.contains(this);
+ }
+
public static boolean isSupported(FlinkVersion version) {
return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
}
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
index 2cbe49b0..3859a359 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
@@ -35,7 +35,45 @@ class FlinkVersionTest {
@Test
void isSupported() {
+ assertFalse(FlinkVersion.isSupported(null));
+ assertFalse(FlinkVersion.isSupported(FlinkVersion.v1_13));
+ assertFalse(FlinkVersion.isSupported(FlinkVersion.v1_14));
+ assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_15));
+ assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_18));
+ assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_19));
assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
+ assertTrue(FlinkVersion.isSupported(FlinkVersion.v2_0));
+ assertTrue(FlinkVersion.isSupported(FlinkVersion.v2_4));
+ }
+
+ @Test
+ void isDeprecated() {
+ assertTrue(FlinkVersion.v1_13.isDeprecated());
+ assertTrue(FlinkVersion.v1_14.isDeprecated());
+ assertTrue(FlinkVersion.v1_15.isDeprecated());
+ assertTrue(FlinkVersion.v1_16.isDeprecated());
+ assertTrue(FlinkVersion.v1_17.isDeprecated());
+ assertTrue(FlinkVersion.v1_18.isDeprecated());
+ assertFalse(FlinkVersion.v1_19.isDeprecated());
+ assertFalse(FlinkVersion.v1_20.isDeprecated());
+ assertFalse(FlinkVersion.v2_0.isDeprecated());
+ assertFalse(FlinkVersion.v2_4.isDeprecated());
+ }
+
+ @Test
+ void isDeprecatedMatchesDeprecatedAnnotation() throws Exception {
+ for (FlinkVersion v : FlinkVersion.values()) {
+ boolean annotated =
+
FlinkVersion.class.getField(v.name()).isAnnotationPresent(Deprecated.class);
+ assertEquals(
+ annotated,
+ v.isDeprecated(),
+ v
+ + ": @Deprecated says "
+ + annotated
+ + " but isDeprecated() says "
+ + v.isDeprecated());
+ }
}
@Test
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index cd30143a..7d4d6f4c 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -70,7 +70,7 @@ public class BaseTestUtils {
public static final String SAMPLE_SESSION_JOB_JAR =
"https://example.com/sample.jar";
public static FlinkDeployment buildSessionCluster() {
- return buildSessionCluster(FlinkVersion.v1_17);
+ return buildSessionCluster(FlinkVersion.v1_20);
}
public static FlinkDeployment buildSessionCluster(FlinkVersion version) {
@@ -94,15 +94,15 @@ public class BaseTestUtils {
}
public static FlinkDeployment buildApplicationCluster(JobState state) {
- return buildApplicationCluster(FlinkVersion.v1_17, state);
+ return buildApplicationCluster(FlinkVersion.v1_20, state);
}
public static FlinkDeployment buildApplicationCluster() {
- return buildApplicationCluster(FlinkVersion.v1_17, JobState.RUNNING);
+ return buildApplicationCluster(FlinkVersion.v1_20, JobState.RUNNING);
}
public static FlinkDeployment buildApplicationCluster(String name, String
namespace) {
- return buildApplicationCluster(name, namespace, FlinkVersion.v1_17,
JobState.RUNNING);
+ return buildApplicationCluster(name, namespace, FlinkVersion.v1_20,
JobState.RUNNING);
}
public static FlinkDeployment buildApplicationCluster(FlinkVersion
version) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
index 4aca7bfa..0f21c81a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
@@ -34,7 +34,7 @@ import java.util.Set;
/** Validator utilities. */
public final class ValidatorUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidatorUtils.class);
public static Set<FlinkResourceValidator>
discoverValidators(FlinkConfigManager configManager) {
var conf = configManager.getDefaultConfig();
@@ -72,6 +72,11 @@ public final class ValidatorUtils {
ctx.getJosdkContext().getClient());
return false;
}
+ if (version.isDeprecated()) {
+ LOG.warn(
+ "Flink version {} is deprecated and may be removed in a
future operator release. Plan to upgrade to a non-deprecated version.",
+ version);
+ }
return true;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 83f8f2cc..db5820b0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -215,7 +215,7 @@ public class TestUtils extends BaseTestUtils {
public static <T extends HasMetadata> Context<T>
createContextWithReadyFlinkDeployment(
Map<String, String> flinkDepConfig, KubernetesClient client) {
- return createContextWithReadyFlinkDeployment(flinkDepConfig, client,
FlinkVersion.v1_18);
+ return createContextWithReadyFlinkDeployment(flinkDepConfig, client,
FlinkVersion.v1_20);
}
public static <T extends HasMetadata> Context<T>
createContextWithReadyFlinkDeployment(
@@ -431,7 +431,7 @@ public class TestUtils extends BaseTestUtils {
public static Stream<Arguments> flinkVersionsAndUpgradeModes() {
List<Arguments> args = new ArrayList<>();
- for (FlinkVersion version : Set.of(FlinkVersion.v1_16,
FlinkVersion.v1_20)) {
+ for (FlinkVersion version : Set.of(FlinkVersion.v1_19,
FlinkVersion.v1_20)) {
for (UpgradeMode upgradeMode : UpgradeMode.values()) {
args.add(arguments(version, upgradeMode));
}
@@ -440,7 +440,7 @@ public class TestUtils extends BaseTestUtils {
}
public static Stream<Arguments> flinkVersions() {
- return Stream.of(arguments(FlinkVersion.v1_16),
arguments(FlinkVersion.v1_20));
+ return Stream.of(arguments(FlinkVersion.v1_19),
arguments(FlinkVersion.v1_20));
}
public static FlinkDeployment createCanaryDeployment() {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index bba875a5..25082625 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -465,6 +465,8 @@ public class DefaultValidatorTest {
+ " is not supported by this operator version");
testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
+ testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_18));
+ testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_19));
testError(
dep -> dep.getSpec().setServiceAccount(null),
@@ -752,7 +754,7 @@ public class DefaultValidatorTest {
// Stopped with LAST_STATE mode with different Flink Version
suspendSpec.getJob().setUpgradeMode(fromUpgrade);
suspendSpec.getJob().setState(fromState);
- suspendSpec.setFlinkVersion(FlinkVersion.v1_18);
+ suspendSpec.setFlinkVersion(FlinkVersion.v1_19);
dep.getStatus()
.getReconciliationStatus()