This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new e1c3192f [FLINK-33089] Drop support for Flink 1.13 and 1.14 and clean up related codepaths e1c3192f is described below commit e1c3192fdda1b22373a0e5821856c028bd4183b8 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Sat Oct 28 21:57:58 2023 +0200 [FLINK-33089] Drop support for Flink 1.13 and 1.14 and clean up related codepaths --- .github/workflows/ci.yml | 16 +------ docs/content/docs/concepts/overview.md | 2 +- docs/content/docs/custom-resource/reference.md | 1 + examples/autoscaling/Dockerfile | 2 +- examples/basic-checkpoint-ha.yaml | 2 +- flink-kubernetes-operator-api/pom.xml | 4 +- .../kubernetes/operator/api/spec/FlinkVersion.java | 7 ++- .../operator/config/FlinkConfigBuilder.java | 4 +- .../controller/FlinkDeploymentController.java | 9 +++- .../controller/FlinkSessionJobController.java | 6 +++ .../sessionjob/SessionJobReconciler.java | 8 ++++ .../operator/service/AbstractFlinkService.java | 43 ++++++------------ .../operator/service/NativeFlinkService.java | 6 +-- .../kubernetes/operator/utils/EventRecorder.java | 3 +- .../kubernetes/operator/utils/FlinkUtils.java | 6 --- .../kubernetes/operator/utils/SnapshotUtils.java | 12 ----- .../kubernetes/operator/utils/ValidatorUtils.java | 18 ++++++++ .../flink/kubernetes/operator/TestUtils.java | 15 +++---- .../kubernetes/operator/TestingFlinkService.java | 24 +++++----- .../operator/config/FlinkConfigBuilderTest.java | 6 +-- .../controller/FlinkDeploymentControllerTest.java | 21 +++++++-- .../controller/FlinkSessionJobControllerTest.java | 51 +++++++++++++++++++--- .../deployment/ApplicationObserverTest.java | 38 ---------------- .../operator/service/NativeFlinkServiceTest.java | 4 +- .../crds/flinkdeployments.flink.apache.org-v1.yml | 1 + 25 files changed, 152 insertions(+), 157 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15bdcad4..70cd7f3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - version: ["v1_18","v1_17","v1_16","v1_15","v1_14","v1_13"] + version: ["v1_18","v1_17","v1_16","v1_15"] namespace: ["default","flink"] mode: ["native", "standalone"] java-version: [ 11, 17 ] @@ -86,17 +86,13 @@ jobs: - namespace: flink extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"' - version: v1_18 - image: ghcr.io\/apache\/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian + image: flink:1.18 - version: v1_17 image: flink:1.17 - version: v1_16 image: flink:1.16 - version: v1_15 image: flink:1.15 - - version: v1_14 - image: flink:1.14 - - version: v1_13 - image: flink:1.13 exclude: - namespace: default test: test_multi_sessionjob.sh @@ -108,14 +104,6 @@ jobs: test: test_autoscaler.sh - mode: standalone test: test_dynamic_config.sh - - version: v1_13 - test: test_autoscaler.sh - - version: v1_13 - test: test_dynamic_config.sh - - version: v1_14 - test: test_autoscaler.sh - - version: v1_14 - test: test_dynamic_config.sh - version: v1_15 test: test_autoscaler.sh - version: v1_15 diff --git a/docs/content/docs/concepts/overview.md b/docs/content/docs/concepts/overview.md index d37d8d3d..baa787f4 100644 --- a/docs/content/docs/concepts/overview.md +++ b/docs/content/docs/concepts/overview.md @@ -36,7 +36,7 @@ Flink Kubernetes Operator aims to capture the responsibilities of a human operat - Stateful and stateless application upgrades - Triggering and managing savepoints - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.13, v1.14, v1.15, v1.16, v1.17 +- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - Application cluster - Session cluster diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 40037e53..4fc1ea25 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -84,6 +84,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit | v1_16 | | | v1_17 | | | v1_18 | | +| v1_19 | | ### IngressSpec **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec diff --git a/examples/autoscaling/Dockerfile b/examples/autoscaling/Dockerfile index e097ab37..30bdd47a 100644 --- a/examples/autoscaling/Dockerfile +++ b/examples/autoscaling/Dockerfile @@ -16,5 +16,5 @@ # limitations under the License. ################################################################################ -FROM ghcr.io/apache/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian +FROM flink:1.18 COPY ./target/autoscaling*.jar /opt/flink/usrlib/autoscaling.jar diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml index a6d035d4..48cd6c70 100644 --- a/examples/basic-checkpoint-ha.yaml +++ b/examples/basic-checkpoint-ha.yaml @@ -27,7 +27,7 @@ spec: taskmanager.numberOfTaskSlots: "2" state.savepoints.dir: file:///flink-data/savepoints state.checkpoints.dir: file:///flink-data/checkpoints - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.type: kubernetes high-availability.storageDir: file:///flink-data/ha serviceAccount: flink jobManager: diff --git a/flink-kubernetes-operator-api/pom.xml b/flink-kubernetes-operator-api/pom.xml index 8a8bcbd4..12a0c1d3 100644 --- a/flink-kubernetes-operator-api/pom.xml +++ b/flink-kubernetes-operator-api/pom.xml @@ -225,7 +225,7 @@ under the License. fork="true" failonerror="true"> <classpath refid="maven.compile.classpath"/> <arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/> - <arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.4.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/> + <arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/> </java> </target> </configuration> @@ -242,7 +242,7 @@ under the License. fork="true" failonerror="true"> <classpath refid="maven.compile.classpath"/> <arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/> - <arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.4.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/> + <arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/> </java> </target> </configuration> diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java index fd49304f..f3532b3c 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java @@ -28,7 +28,8 @@ public enum FlinkVersion { v1_15, v1_16, v1_17, - v1_18; + v1_18, + v1_19; public boolean isNewerVersionThan(FlinkVersion otherVersion) { return this.ordinal() > otherVersion.ordinal(); @@ -42,4 +43,8 @@ public enum FlinkVersion { public static FlinkVersion current() { return values()[values().length - 1]; } + + public static boolean isSupported(FlinkVersion version) { + return version != null && version.isNewerVersionThan(FlinkVersion.v1_14); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java index 82ce46d0..f99c08fc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java @@ -162,9 +162,7 @@ public class FlinkConfigBuilder { } // We need to keep the application clusters around for proper operator behaviour - if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) { - effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); - } + effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index e4fe0a13..c8c24ef3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; @@ -53,7 +54,7 @@ import java.util.Optional; import java.util.Set; /** Controller that runs the main reconcile loop for Flink deployments. */ -@ControllerConfiguration() +@ControllerConfiguration public class FlinkDeploymentController implements Reconciler<FlinkDeployment>, ErrorStatusHandler<FlinkDeployment>, @@ -127,6 +128,12 @@ public class FlinkDeploymentController statusRecorder.updateStatusFromCache(flinkApp); FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp); var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext); + + // If we get an unsupported Flink version, trigger event and exit + if (!ValidatorUtils.validateSupportedVersion(ctx, eventRecorder)) { + return UpdateControl.noUpdate(); + } + try { observerFactory.getOrCreate(flinkApp).observe(ctx); if (!validateDeployment(ctx)) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 34972915..3ef87cb7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; @@ -98,6 +99,11 @@ public class FlinkSessionJobController FlinkSessionJob previousJob = ReconciliationUtils.clone(flinkSessionJob); var ctx = ctxFactory.getResourceContext(flinkSessionJob, josdkContext); + // If we get an unsupported Flink version, trigger event and exit + if (!ValidatorUtils.validateSupportedVersion(ctx, eventRecorder)) { + return UpdateControl.noUpdate(); + } + observer.observe(ctx); if (!validateSessionJob(ctx)) { statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index 16edab6b..c7cac9df 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -67,6 +67,14 @@ public class SessionJobReconciler Optional<String> savepoint, boolean requireHaMetadata) throws Exception { + + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Normal, + EventRecorder.Reason.Submit, + EventRecorder.Component.Job, + MSG_SUBMIT, + ctx.getKubernetesClient()); var jobID = ctx.getFlinkService() .submitJobToSessionCluster( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 9ee49f00..399e692d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -47,7 +47,6 @@ import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; -import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; @@ -275,7 +274,8 @@ public abstract class AbstractFlinkService implements FlinkService { var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null; Optional<String> savepointOpt = Optional.empty(); - var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf); + var savepointFormatType = + conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); try (var clusterClient = getClusterClient(conf)) { var clusterId = clusterClient.getClusterId(); switch (upgradeMode) { @@ -313,11 +313,7 @@ public abstract class AbstractFlinkService implements FlinkService { KubernetesOperatorConfigOptions .DRAIN_ON_SAVEPOINT_DELETION), savepointDirectory, - conf.get(FLINK_VERSION) - .isNewerVersionThan( - FlinkVersion.v1_14) - ? savepointFormatType - : null) + savepointFormatType) .get(timeout, TimeUnit.SECONDS); savepointOpt = Optional.of(savepoint); LOG.info("Job successfully suspended with savepoint {}.", savepoint); @@ -365,11 +361,9 @@ public abstract class AbstractFlinkService implements FlinkService { deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp); }); - var shutdownDisabled = - upgradeMode != UpgradeMode.LAST_STATE - && FlinkUtils.clusterShutdownDisabled( - ReconciliationUtils.getDeployedSpec(deployment)); - if (!shutdownDisabled) { + // Unless we leave the jm around after savepoint, we should wait until it has finished + // shutting down + if (deleteClusterAfterSavepoint || upgradeMode != UpgradeMode.SAVEPOINT) { waitForClusterShutdown(conf); deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); } @@ -422,13 +416,9 @@ public abstract class AbstractFlinkService implements FlinkService { KubernetesOperatorConfigOptions .DRAIN_ON_SAVEPOINT_DELETION), savepointDirectory, - conf.get(FLINK_VERSION) - .isNewerVersionThan( - FlinkVersion.v1_14) - ? conf.get( - KubernetesOperatorConfigOptions - .OPERATOR_SAVEPOINT_FORMAT_TYPE) - : null) + conf.get( + KubernetesOperatorConfigOptions + .OPERATOR_SAVEPOINT_FORMAT_TYPE)) .get(timeout, TimeUnit.SECONDS); savepointOpt = Optional.of(savepoint); LOG.info( @@ -484,7 +474,8 @@ public abstract class AbstractFlinkService implements FlinkService { Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)); var timeout = operatorConfig.getFlinkClientTimeout().getSeconds(); - var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf); + var savepointFormatType = + conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); var response = clusterClient @@ -492,13 +483,7 @@ public abstract class AbstractFlinkService implements FlinkService { savepointTriggerHeaders, savepointTriggerMessageParameters, new SavepointTriggerRequestBody( - savepointDirectory, - false, - conf.get(FLINK_VERSION) - .isNewerVersionThan(FlinkVersion.v1_14) - ? savepointFormatType - : null, - null)) + savepointDirectory, false, savepointFormatType, null)) .get(timeout, TimeUnit.SECONDS); LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString()); @@ -781,9 +766,7 @@ public abstract class AbstractFlinkService implements FlinkService { jobID, job.getAllowNonRestoredState(), savepoint, - conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14) - ? RestoreMode.DEFAULT - : null, + RestoreMode.DEFAULT, conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16) ? conf.toMap() : null); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 0f55e948..27d5a265 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -114,11 +114,7 @@ public class NativeFlinkService extends AbstractFlinkService { public void cancelJob( FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception { - // prior to Flink 1.15, ensure removal of orphaned config maps - // https://issues.apache.org/jira/browse/FLINK-30004 - boolean deleteClusterAfterSavepoint = - !deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14); - cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint); + cancelJob(deployment, upgradeMode, configuration, false); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index e99c8ef9..a6e08d75 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -249,6 +249,7 @@ public class EventRecorder { ScalingReport, IneffectiveScaling, AutoscalerError, - Scaling + Scaling, + UnsupportedFlinkVersion } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 02a587b9..d8643e2b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -26,8 +26,6 @@ import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; -import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; @@ -340,10 +338,6 @@ public class FlinkUtils { || haMode.equalsIgnoreCase("kubernetes"); } - public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) { - return spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14); - } - public static int getNumTaskManagers(Configuration conf) { int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM); return getNumTaskManagers(conf, parallelism); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java index fdd537d6..1f250b52 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.status.JobStatus; @@ -403,15 +402,4 @@ public class SnapshotUtils { } } } - - public static SavepointFormatType getSavepointFormatType(Configuration configuration) { - var savepointFormatType = org.apache.flink.core.execution.SavepointFormatType.CANONICAL; - if (configuration.get(FLINK_VERSION) != null - && configuration.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)) { - savepointFormatType = - configuration.get( - KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); - } - return savepointFormatType; - } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java index 96fb29f3..4aca7bfa 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java @@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.validation.DefaultValidator; import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; @@ -56,4 +58,20 @@ public final class ValidatorUtils { }); return resourceValidators; } + + public static boolean validateSupportedVersion( + FlinkResourceContext<?> ctx, EventRecorder eventRecorder) { + var version = ctx.getFlinkVersion(); + if (!FlinkVersion.isSupported(version)) { + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Warning, + EventRecorder.Reason.UnsupportedFlinkVersion, + EventRecorder.Component.Operator, + "Flink version " + version + " is not supported by this operator version", + ctx.getJosdkContext().getClient()); + return false; + } + return true; + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 42672b77..2b2e23a7 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -191,16 +191,16 @@ public class TestUtils extends BaseTestUtils { } public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment( - Map<String, String> flinkDepConfig) { - return createContextWithReadyFlinkDeployment(flinkDepConfig, null); + Map<String, String> flinkDepConfig, KubernetesClient client) { + return createContextWithReadyFlinkDeployment(flinkDepConfig, client, FlinkVersion.v1_18); } public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment( - Map<String, String> flinkDepConfig, KubernetesClient client) { + Map<String, String> flinkDepConfig, KubernetesClient client, FlinkVersion version) { return new TestingContext<>() { @Override public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) { - var session = buildSessionCluster(); + var session = buildSessionCluster(version); session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); session.getSpec().getFlinkConfiguration().putAll(flinkDepConfig); session.getStatus() @@ -324,7 +324,7 @@ public class TestUtils extends BaseTestUtils { public static Stream<Arguments> flinkVersionsAndUpgradeModes() { List<Arguments> args = new ArrayList<>(); - for (FlinkVersion version : Set.of(FlinkVersion.v1_14, FlinkVersion.v1_15)) { + for (FlinkVersion version : Set.of(FlinkVersion.v1_15, FlinkVersion.v1_18)) { for (UpgradeMode upgradeMode : UpgradeMode.values()) { args.add(arguments(version, upgradeMode)); } @@ -333,10 +333,7 @@ public class TestUtils extends BaseTestUtils { } public static Stream<Arguments> flinkVersions() { - return Stream.of( - arguments(FlinkVersion.v1_14), - arguments(FlinkVersion.v1_15), - arguments(FlinkVersion.v1_17)); + return Stream.of(arguments(FlinkVersion.v1_15), arguments(FlinkVersion.v1_18)); } public static FlinkDeployment createCanaryDeployment() { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 4dc8e75d..94453b06 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -53,7 +53,6 @@ import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper; import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest; import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; -import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -298,7 +297,8 @@ public class TestingFlinkService extends AbstractFlinkService { Configuration conf) { var triggerId = "savepoint_trigger_" + savepointTriggerCounter++; - var savepointFormatType = SnapshotUtils.getSavepointFormatType(conf); + var savepointFormatType = + conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); savepointInfo.setTrigger( triggerId, triggerType, SavepointFormatType.valueOf(savepointFormatType.name())); savepointTriggers.put(triggerId, false); @@ -471,18 +471,14 @@ public class TestingFlinkService extends AbstractFlinkService { var sp = savepoint ? "savepoint_" + savepointCounter++ : null; - if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) { - JobStatusMessage oldStatus = jobOpt.get().f1; - jobOpt.get().f1 = - new JobStatusMessage( - oldStatus.getJobId(), - oldStatus.getJobName(), - JobStatus.FINISHED, - oldStatus.getStartTime()); - jobOpt.get().f0 = sp; - } else { - jobs.removeIf(js -> js.f1.getJobId().equals(jobID)); - } + JobStatusMessage oldStatus = jobOpt.get().f1; + jobOpt.get().f1 = + new JobStatusMessage( + oldStatus.getJobId(), + oldStatus.getJobName(), + JobStatus.FINISHED, + oldStatus.getStartTime()); + jobOpt.get().f0 = sp; return sp; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java index 14bbaf9e..00b418eb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java @@ -177,11 +177,7 @@ public class FlinkConfigBuilderTest { new FlinkConfigBuilder(flinkDeployment, new Configuration()) .applyFlinkConfiguration() .build(); - if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) { - Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH)); - } else { - Assertions.assertTrue(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH)); - } + Assertions.assertFalse(configuration.getBoolean(SHUTDOWN_ON_APPLICATION_FINISH)); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index e9a13160..61dbf692 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -56,6 +56,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -448,9 +449,7 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setState(JobState.SUSPENDED); testController.reconcile(appCluster, context); assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) - ? JobManagerDeploymentStatus.READY - : JobManagerDeploymentStatus.MISSING, + JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); // Resume from last savepoint @@ -1226,6 +1225,22 @@ public class FlinkDeploymentControllerTest { appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); } + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testUnsupportedVersions(FlinkVersion version) throws Exception { + var appCluster = TestUtils.buildApplicationCluster(version); + var updateControl = testController.reconcile(appCluster, context); + var lastEvent = testController.events().poll(); + if (!version.isNewerVersionThan(FlinkVersion.v1_14)) { + assertTrue(updateControl.getScheduleDelay().isEmpty()); + assertEquals( + EventRecorder.Reason.UnsupportedFlinkVersion.name(), lastEvent.getReason()); + } else { + assertTrue(updateControl.getScheduleDelay().isPresent()); + assertEquals(EventRecorder.Reason.Submit.name(), lastEvent.getReason()); + } + } + private HasMetadata getIngress(FlinkDeployment deployment) { if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) { return kubernetesClient diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index c7eed4c9..0705f70f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus; @@ -39,8 +40,11 @@ import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.ArrayList; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -85,7 +89,10 @@ class FlinkSessionJobControllerTest { // Ignore } - Assertions.assertEquals(1, testController.events().size()); + Assertions.assertEquals(2, testController.events().size()); + // Discard submit event + testController.events().remove(); + var event = testController.events().remove(); Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType()); Assertions.assertEquals("SessionJobException", event.getReason()); @@ -271,7 +278,10 @@ class FlinkSessionJobControllerTest { testController.reconcile(sessionJob, context); - assertEquals(1, testController.events().size()); + assertEquals(2, testController.events().size()); + assertEquals( + EventRecorder.Reason.Submit, + EventRecorder.Reason.valueOf(testController.events().poll().getReason())); assertEquals( EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); @@ -312,7 +322,10 @@ class FlinkSessionJobControllerTest { assertEquals(1, jobs.size()); assertNull(jobs.get(0).f0); - assertEquals(1, testController.events().size()); + assertEquals(2, testController.events().size()); + assertEquals( + EventRecorder.Reason.Submit, + EventRecorder.Reason.valueOf(testController.events().poll().getReason())); assertEquals( EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); @@ -334,10 +347,13 @@ class FlinkSessionJobControllerTest { testController.reconcile(sessionJob, context); flinkService.clearJobsInTerminalState(); testController.reconcile(sessionJob, context); - assertEquals(2, testController.events().size()); + assertEquals(3, testController.events().size()); assertEquals( EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); + assertEquals( + EventRecorder.Reason.Submit, + EventRecorder.Reason.valueOf(testController.events().poll().getReason())); assertEquals( EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); @@ -368,16 +384,19 @@ class FlinkSessionJobControllerTest { sessionJob.getSpec().getJob().setParallelism(-1); testController.reconcile(sessionJob, context); flinkService.clearJobsInTerminalState(); - assertEquals(1, testController.events().size()); + assertEquals(2, testController.events().size()); testController.reconcile(sessionJob, context); var statusEvents = testController.events().stream() .filter(e -> !e.getReason().equals(ValidationError.name())) .collect(Collectors.toList()); - assertEquals(1, statusEvents.size()); + assertEquals(2, statusEvents.size()); assertEquals( - EventRecorder.Reason.JobStatusChanged, + EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(statusEvents.get(0).getReason())); + assertEquals( + EventRecorder.Reason.JobStatusChanged, + EventRecorder.Reason.valueOf(statusEvents.get(1).getReason())); assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState()); assertEquals( @@ -508,6 +527,24 @@ class FlinkSessionJobControllerTest { assertEquals(0, testController.getCanaryResourceManager().getNumberOfActiveCanaries()); } + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testUnsupportedVersions(FlinkVersion version) throws Exception { + context = + TestUtils.createContextWithReadyFlinkDeployment( + Map.of(), kubernetesClient, version); + var updateControl = testController.reconcile(TestUtils.buildSessionJob(), context); + var lastEvent = testController.events().poll(); + if (!version.isNewerVersionThan(FlinkVersion.v1_14)) { + assertTrue(updateControl.getScheduleDelay().isEmpty()); + assertEquals( + EventRecorder.Reason.UnsupportedFlinkVersion.name(), lastEvent.getReason()); + } else { + assertTrue(updateControl.getScheduleDelay().isPresent()); + assertEquals(EventRecorder.Reason.Submit.name(), lastEvent.getReason()); + } + } + private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJob) throws Exception { UpdateControl<FlinkDeployment> updateControl = diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 68c0c227..4e57245d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -24,7 +24,6 @@ import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobStatus; @@ -700,43 +699,6 @@ public class ApplicationObserverTest extends OperatorTestBase { .getSavepointInfo() .getLastSavepoint() .getFormatType()); - - // canonical for flink savepoint - Long thirdNonce = 789L; - deployment.getSpec().getJob().setSavepointTriggerNonce(thirdNonce); - deployment.getSpec().setFlinkVersion(FlinkVersion.v1_14); - deployment - .getSpec() - .setFlinkConfiguration( - Map.of( - OPERATOR_SAVEPOINT_FORMAT_TYPE.key(), - org.apache.flink.core.execution.SavepointFormatType.NATIVE.name())); - conf = configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); - flinkService.triggerSavepoint( - deployment.getStatus().getJobStatus().getJobId(), - SnapshotTriggerType.MANUAL, - deployment.getStatus().getJobStatus().getSavepointInfo(), - conf); - - observer.observe(deployment, readyContext); - observer.observe(deployment, readyContext); - assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus())); - assertEquals( - thirdNonce, - deployment - .getStatus() - .getJobStatus() - .getSavepointInfo() - .getLastSavepoint() - .getTriggerNonce()); - assertEquals( - SavepointFormatType.CANONICAL, - deployment - .getStatus() - .getJobStatus() - .getSavepointInfo() - .getLastSavepoint() - .getFormatType()); } private void bringToReadyStatus(FlinkDeployment deployment) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index fb823500..27bc846a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -152,9 +152,7 @@ public class NativeFlinkServiceTest { UpgradeMode upgradeMode, Configuration conf, boolean deleteClusterAfterSavepoint) { - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) ? false : true, - deleteClusterAfterSavepoint); + assertEquals(false, deleteClusterAfterSavepoint); tested.set(true); } }; diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 685e87ef..d16ba415 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -45,6 +45,7 @@ spec: - v1_16 - v1_17 - v1_18 + - v1_19 type: string ingress: properties: