This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 93e68f2b [FLINK-37571] Fix JobGraph removal for 2.0 last-state upgrades
93e68f2b is described below

commit 93e68f2bc2a007dfcfe46a230a7ce49eaf3c7736
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Thu Mar 27 11:45:22 2025 +0100

    [FLINK-37571] Fix JobGraph removal for 2.0 last-state upgrades
---
 .../org/apache/flink/kubernetes/operator/utils/FlinkUtils.java |  3 ++-
 .../apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java | 10 ++++++----
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 4c41aedf..dbd68ba4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -335,7 +335,8 @@ public class FlinkUtils {
     }
 
     private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
-        return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
+        return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX)
+                || entry.getKey().startsWith("executionPlan-");
     }
 
     public static boolean isZookeeperHAActivated(Configuration configuration) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 68ee3e3d..003557f9 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -51,6 +50,8 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.net.HttpURLConnection;
 import java.util.Collections;
@@ -125,12 +126,13 @@ public class FlinkUtilsTest {
         assertEquals(expectedProbe, 
pod.getSpec().getContainers().get(1).getStartupProbe());
     }
 
-    @Test
-    public void testDeleteJobGraphInKubernetesHA() {
+    @ParameterizedTest
+    @ValueSource(strings = {"jobGraph-jobId", "executionPlan-jobId"})
+    public void testDeleteJobGraphInKubernetesHA(String key) {
         final String name = "ha-configmap";
         final String clusterId = "cluster-id";
         final Map<String, String> data = new HashMap<>();
-        data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), 
"job-graph-data");
+        data.put(key, "job-graph-data");
         data.put("leader", "localhost");
         createHAConfigMapWithData(name, kubernetesClient.getNamespace(), 
clusterId, data);
         assertNotNull(kubernetesClient.configMaps().withName(name).get());

Reply via email to