This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit de48cdd6897a2ea559287848ffca0388a903dcd9
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Wed Jul 2 14:30:08 2025 +0200

    [FLINK-38033] Fix accidental upgrade snapshot dispose bug
---
 .../deployment/AbstractJobReconciler.java          | 37 ++++++++-------
 .../deployment/ApplicationReconcilerTest.java      | 54 +++++++++++++++-------
 2 files changed, 59 insertions(+), 32 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 023396b5..9e79982c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -409,25 +409,30 @@ public abstract class AbstractJobReconciler<
                         
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                 .name());
 
-        FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
-                conf,
-                ctx.getOperatorConfig(),
-                ctx.getKubernetesClient(),
-                ctx.getResource(),
-                savepointFormatType,
-                savepointLocation);
+        var snapshotCrOpt =
+                FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+                        conf,
+                        ctx.getOperatorConfig(),
+                        ctx.getKubernetesClient(),
+                        ctx.getResource(),
+                        savepointFormatType,
+                        savepointLocation);
         var jobStatus = ctx.getResource().getStatus().getJobStatus();
         jobStatus.setUpgradeSavepointPath(savepointLocation);
 
-        // Register created savepoint in the now deprecated savepoint info and 
history
-        var savepoint =
-                new Savepoint(
-                        cancelTs.toEpochMilli(),
-                        savepointLocation,
-                        SnapshotTriggerType.UPGRADE,
-                        savepointFormatType,
-                        null);
-        jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+        if (snapshotCrOpt.isEmpty()) {
+            // Register created savepoint in the now deprecated savepoint info 
and history
+            // only if snapshot CR was not created, otherwise it would be 
double recorded
+            // and disposed immediately
+            var savepoint =
+                    new Savepoint(
+                            cancelTs.toEpochMilli(),
+                            savepointLocation,
+                            SnapshotTriggerType.UPGRADE,
+                            savepointFormatType,
+                            null);
+            jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
+        }
     }
 
     /**
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b31361b2..3d8ff290 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -90,6 +90,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.ThrowingConsumer;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -112,6 +113,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.flink.api.common.JobStatus.RECONCILING;
@@ -137,6 +139,7 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * @link JobStatusObserver unit tests
@@ -235,9 +238,12 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
     }
 
     @ParameterizedTest
-    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
-    public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
+    @MethodSource("upgradeArgs")
+    public void testUpgrade(FlinkVersion flinkVersion, boolean 
snapshotResource) throws Exception {
         FlinkDeployment deployment = 
TestUtils.buildApplicationCluster(flinkVersion);
+        conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource);
+        configManager.updateDefaultConfig(conf);
+        operatorConfig = configManager.getOperatorConfiguration();
 
         reconciler.reconcile(deployment, context);
         var runningJobs = flinkService.listJobs();
@@ -305,26 +311,35 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         assertEquals(0, flinkService.getRunningCount());
 
         var spInfo = 
statefulUpgrade.getStatus().getJobStatus().getSavepointInfo();
-        assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
-        assertEquals(SnapshotTriggerType.UPGRADE, 
spInfo.getLastSavepoint().getTriggerType());
-        assertEquals(
-                spInfo.getLastSavepoint(),
-                new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+        if (snapshotResource) {
+            assertNull(spInfo.getLastSavepoint());
+        } else {
+            assertEquals("savepoint_0", 
spInfo.getLastSavepoint().getLocation());
+            assertEquals(SnapshotTriggerType.UPGRADE, 
spInfo.getLastSavepoint().getTriggerType());
+            assertEquals(
+                    spInfo.getLastSavepoint(),
+                    new LinkedList<>(spInfo.getSavepointHistory()).getLast());
+        }
 
         reconciler.reconcile(statefulUpgrade, context);
 
         runningJobs = flinkService.listJobs();
         assertEquals(1, flinkService.getRunningCount());
         var snapshots = 
TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
-        assertThat(snapshots).isNotEmpty();
-        
assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0");
-        assertEquals(
-                SnapshotTriggerType.UPGRADE.name(),
-                snapshots
-                        .get(0)
-                        .getMetadata()
-                        .getLabels()
-                        .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+        if (snapshotResource) {
+            assertThat(snapshots).isNotEmpty();
+            assertThat(snapshots.get(0).getSpec().getSavepoint().getPath())
+                    .isEqualTo("savepoint_0");
+            assertEquals(
+                    SnapshotTriggerType.UPGRADE.name(),
+                    snapshots
+                            .get(0)
+                            .getMetadata()
+                            .getLabels()
+                            .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
+        } else {
+            assertThat(snapshots).isEmpty();
+        }
 
         // Make sure jobId rotated on savepoint
         verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
@@ -370,6 +385,13 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
     }
 
+    private static Stream<Arguments> upgradeArgs() {
+        return Stream.of(
+                arguments(FlinkVersion.v1_16, true),
+                arguments(FlinkVersion.v1_20, true),
+                arguments(FlinkVersion.v1_20, false));
+    }
+
     private void verifyJobId(
             FlinkDeployment deployment, JobStatusMessage status, Configuration 
conf, JobID jobId) {
         // jobId set by operator

Reply via email to