This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit de48cdd6897a2ea559287848ffca0388a903dcd9 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Wed Jul 2 14:30:08 2025 +0200 [FLINK-38033] Fix accidental upgrade snapshot dispose bug --- .../deployment/AbstractJobReconciler.java | 37 ++++++++------- .../deployment/ApplicationReconcilerTest.java | 54 +++++++++++++++------- 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 023396b5..9e79982c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -409,25 +409,30 @@ public abstract class AbstractJobReconciler< conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE) .name()); - FlinkStateSnapshotUtils.createUpgradeSnapshotResource( - conf, - ctx.getOperatorConfig(), - ctx.getKubernetesClient(), - ctx.getResource(), - savepointFormatType, - savepointLocation); + var snapshotCrOpt = + FlinkStateSnapshotUtils.createUpgradeSnapshotResource( + conf, + ctx.getOperatorConfig(), + ctx.getKubernetesClient(), + ctx.getResource(), + savepointFormatType, + savepointLocation); var jobStatus = ctx.getResource().getStatus().getJobStatus(); jobStatus.setUpgradeSavepointPath(savepointLocation); - // Register created savepoint in the now deprecated savepoint info and history - var savepoint = - new Savepoint( - cancelTs.toEpochMilli(), - savepointLocation, - SnapshotTriggerType.UPGRADE, - savepointFormatType, - null); - jobStatus.getSavepointInfo().updateLastSavepoint(savepoint); + if (snapshotCrOpt.isEmpty()) { + // Register created savepoint in the now deprecated savepoint info and history + // only if snapshot CR was not created, otherwise it would be double recorded + // and disposed immediately + var savepoint = + new Savepoint( + cancelTs.toEpochMilli(), + savepointLocation, + SnapshotTriggerType.UPGRADE, + savepointFormatType, + null); + jobStatus.getSavepointInfo().updateLastSavepoint(savepoint); + } } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index b31361b2..3d8ff290 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -90,6 +90,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.ThrowingConsumer; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -112,6 +113,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Stream; import static org.apache.flink.api.common.JobStatus.FINISHED; import static org.apache.flink.api.common.JobStatus.RECONCILING; @@ -137,6 +139,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * @link JobStatusObserver unit tests @@ -235,9 +238,12 @@ public class ApplicationReconcilerTest extends OperatorTestBase { } @ParameterizedTest - @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") - public void testUpgrade(FlinkVersion flinkVersion) throws Exception { + @MethodSource("upgradeArgs") + public void testUpgrade(FlinkVersion flinkVersion, boolean snapshotResource) throws Exception { FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion); + conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource); + configManager.updateDefaultConfig(conf); + operatorConfig = configManager.getOperatorConfiguration(); reconciler.reconcile(deployment, context); var runningJobs = flinkService.listJobs(); @@ -305,26 +311,35 @@ public class ApplicationReconcilerTest extends OperatorTestBase { assertEquals(0, flinkService.getRunningCount()); var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo(); - assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation()); - assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType()); - assertEquals( - spInfo.getLastSavepoint(), - new LinkedList<>(spInfo.getSavepointHistory()).getLast()); + if (snapshotResource) { + assertNull(spInfo.getLastSavepoint()); + } else { + assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation()); + assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType()); + assertEquals( + spInfo.getLastSavepoint(), + new LinkedList<>(spInfo.getSavepointHistory()).getLast()); + } reconciler.reconcile(statefulUpgrade, context); runningJobs = flinkService.listJobs(); assertEquals(1, flinkService.getRunningCount()); var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment); - assertThat(snapshots).isNotEmpty(); - assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0"); - assertEquals( - SnapshotTriggerType.UPGRADE.name(), - snapshots - .get(0) - .getMetadata() - .getLabels() - .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); + if (snapshotResource) { + assertThat(snapshots).isNotEmpty(); + assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()) + .isEqualTo("savepoint_0"); + assertEquals( + SnapshotTriggerType.UPGRADE.name(), + snapshots + .get(0) + .getMetadata() + .getLabels() + .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); + } else { + assertThat(snapshots).isEmpty(); + } // Make sure jobId rotated on savepoint verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); @@ -370,6 +385,13 @@ public class ApplicationReconcilerTest extends OperatorTestBase { verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); } + private static Stream<Arguments> upgradeArgs() { + return Stream.of( + arguments(FlinkVersion.v1_16, true), + arguments(FlinkVersion.v1_20, true), + arguments(FlinkVersion.v1_20, false)); + } + private void verifyJobId( FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) { // jobId set by operator