This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 02d8dd5d [FLINK-33089] Clean up 1.13 and 1.14 references from docs and
code (#768)
02d8dd5d is described below
commit 02d8dd5d915b7d890d09c23c9003580ab63e6dc1
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 1 16:35:15 2024 +0100
[FLINK-33089] Clean up 1.13 and 1.14 references from docs and code (#768)
---
docs/content/docs/custom-resource/overview.md | 2 +-
docs/content/docs/custom-resource/reference.md | 4 ++--
.../flink/kubernetes/operator/api/spec/FlinkVersion.java | 10 +++++++---
.../flink/kubernetes/operator/config/FlinkConfigBuilder.java | 4 ++--
.../kubernetes/operator/service/AbstractFlinkService.java | 2 +-
.../flink/kubernetes/operator/service/NativeFlinkService.java | 2 +-
.../apache/flink/kubernetes/operator/utils/SnapshotUtils.java | 2 +-
.../operator/controller/FlinkDeploymentControllerTest.java | 2 +-
.../operator/controller/FlinkSessionJobControllerTest.java | 2 +-
.../reconciler/deployment/ApplicationReconcilerTest.java | 2 +-
.../kubernetes/operator/service/AbstractFlinkServiceTest.java | 2 +-
11 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/docs/content/docs/custom-resource/overview.md
b/docs/content/docs/custom-resource/overview.md
index 01b098ad..a4362e98 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -85,7 +85,7 @@ The spec contains all the information the operator need to
deploy and manage you
Most deployments will define at least the following fields:
- `image` : Docker used to run Flink job and task manager processes
- - `flinkVersion` : Flink version used in the image (`v1_13`, `v1_14`,
`v1_15`, `v1_16` ...)
+ - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16` ...)
- `serviceAccount` : Kubernetes service account used by the Flink pods
- `taskManager, jobManager` : Job and Task manager pod resource specs (cpu,
memory, ephemeralStorage)
- `flinkConfiguration` : Map of Flink configuration overrides such as HA and
checkpointing configs
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 030e043a..c7810eb2 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -78,8 +78,8 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| Value | Docs |
| ----- | ---- |
-| v1_13 | |
-| v1_14 | |
+| v1_13 | No longer supported since 1.7 operator release. |
+| v1_14 | No longer supported since 1.7 operator release. |
| v1_15 | |
| v1_16 | |
| v1_17 | |
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index f3532b3c..8d4b6b7d 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -23,7 +23,11 @@ import org.apache.flink.annotation.Experimental;
/** Enumeration for supported Flink versions. */
@Experimental
public enum FlinkVersion {
+ /** No longer supported since 1.7 operator release. */
+ @Deprecated
v1_13,
+ /** No longer supported since 1.7 operator release. */
+ @Deprecated
v1_14,
v1_15,
v1_16,
@@ -31,8 +35,8 @@ public enum FlinkVersion {
v1_18,
v1_19;
- public boolean isNewerVersionThan(FlinkVersion otherVersion) {
- return this.ordinal() > otherVersion.ordinal();
+ public boolean isEqualOrNewer(FlinkVersion otherVersion) {
+ return this.ordinal() >= otherVersion.ordinal();
}
/**
@@ -45,6 +49,6 @@ public enum FlinkVersion {
}
public static boolean isSupported(FlinkVersion version) {
- return version != null &&
version.isNewerVersionThan(FlinkVersion.v1_14);
+ return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 39d0fed6..6caa46a3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -118,7 +118,7 @@ public class FlinkConfigBuilder {
protected FlinkConfigBuilder applyImage() {
if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
String configKey;
- if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16))
{
+ if (spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17)) {
configKey = KubernetesConfigOptions.CONTAINER_IMAGE.key();
} else {
configKey = "kubernetes.container.image";
@@ -469,7 +469,7 @@ public class FlinkConfigBuilder {
return;
}
- boolean newConfKeys =
spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16);
+ boolean newConfKeys =
spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17);
String configKey;
if (isJM) {
if (newConfKeys) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 611ab152..2fafb027 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -807,7 +807,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
job.getAllowNonRestoredState(),
savepoint,
RestoreMode.DEFAULT,
-
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
+
conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
? conf.toMap()
: null);
LOG.info("Submitting job: {} to session cluster.", jobID);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 41083d91..52bb398e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -250,7 +250,7 @@ public class NativeFlinkService extends
AbstractFlinkService {
return false;
}
- if
(!observeConfig.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_17)) {
+ if
(!observeConfig.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_18)) {
LOG.debug("In-place rescaling is only available starting from
Flink 1.18");
return false;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 1f250b52..2322e484 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -353,7 +353,7 @@ public class SnapshotUtils {
public static boolean isSnapshotTriggeringSupported(Configuration conf) {
// Flink REST API supports triggering checkpoints externally starting
with 1.17
return conf.get(FLINK_VERSION) != null
- &&
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16);
+ && conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17);
}
public static boolean gracePeriodEnded(Duration gracePeriod, SnapshotInfo
snapshotInfo) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 61dbf692..904944da 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -1231,7 +1231,7 @@ public class FlinkDeploymentControllerTest {
var appCluster = TestUtils.buildApplicationCluster(version);
var updateControl = testController.reconcile(appCluster, context);
var lastEvent = testController.events().poll();
- if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+ if (!version.isEqualOrNewer(FlinkVersion.v1_15)) {
assertTrue(updateControl.getScheduleDelay().isEmpty());
assertEquals(
EventRecorder.Reason.UnsupportedFlinkVersion.name(),
lastEvent.getReason());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 5c620256..d532ff25 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -535,7 +535,7 @@ class FlinkSessionJobControllerTest {
Map.of(), kubernetesClient, version);
var updateControl =
testController.reconcile(TestUtils.buildSessionJob(), context);
var lastEvent = testController.events().poll();
- if (!version.isNewerVersionThan(FlinkVersion.v1_14)) {
+ if (!version.isEqualOrNewer(FlinkVersion.v1_15)) {
assertTrue(updateControl.getScheduleDelay().isEmpty());
assertEquals(
EventRecorder.Reason.UnsupportedFlinkVersion.name(),
lastEvent.getReason());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ce83bce5..c414245d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -946,7 +946,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
}
statusRecorder.updateStatusFromCache(deployment);
- if (!flinkVersion.isNewerVersionThan(FlinkVersion.v1_15)) {
+ if (!flinkVersion.isEqualOrNewer(FlinkVersion.v1_16)) {
assertFalse(StringUtils.isBlank(getJobStatus(deployment).getJobId()));
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 3739298f..637e3bed 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -202,7 +202,7 @@ public class AbstractFlinkServiceTest {
job.getMetadata(), job.getSpec(), JobID.generate(),
deployConf, null);
// Make sure that deploy conf was passed to jar run
- if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
+ if (flinkVersion.isEqualOrNewer(FlinkVersion.v1_17)) {
assertEquals(deployConf.toMap(),
jarRuns.get(0).getFlinkConfiguration().toMap());
} else {
assertTrue(jarRuns.get(0).getFlinkConfiguration().toMap().isEmpty());