This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 93e68f2b [FLINK-37571] Fix JobGraph removal for 2.0 last-state upgrades 93e68f2b is described below commit 93e68f2bc2a007dfcfe46a230a7ce49eaf3c7736 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Mar 27 11:45:22 2025 +0100 [FLINK-37571] Fix JobGraph removal for 2.0 last-state upgrades --- .../org/apache/flink/kubernetes/operator/utils/FlinkUtils.java | 3 ++- .../apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 4c41aedf..dbd68ba4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -335,7 +335,8 @@ public class FlinkUtils { } private static boolean isJobGraphKey(Map.Entry<String, String> entry) { - return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX); + return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX) + || entry.getKey().startsWith("executionPlan-"); } public static boolean isZookeeperHAActivated(Configuration configuration) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java index 68ee3e3d..003557f9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.utils; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -51,6 +50,8 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.net.HttpURLConnection; import java.util.Collections; @@ -125,12 +126,13 @@ public class FlinkUtilsTest { assertEquals(expectedProbe, pod.getSpec().getContainers().get(1).getStartupProbe()); } - @Test - public void testDeleteJobGraphInKubernetesHA() { + @ParameterizedTest + @ValueSource(strings = {"jobGraph-jobId", "executionPlan-jobId"}) + public void testDeleteJobGraphInKubernetesHA(String key) { final String name = "ha-configmap"; final String clusterId = "cluster-id"; final Map<String, String> data = new HashMap<>(); - data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data"); + data.put(key, "job-graph-data"); data.put("leader", "localhost"); createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, data); assertNotNull(kubernetesClient.configMaps().withName(name).get());