This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 3f0dc2ee [FLINK-33011] Never accidentally delete HA metadata for last 
state deployments
3f0dc2ee is described below

commit 3f0dc2ee5534084bc162e6deaded36e93bb5e384
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Sep 14 15:29:58 2023 +0200

    [FLINK-33011] Never accidentally delete HA metadata for last state 
deployments
---
 .../deployment/ApplicationReconciler.java          |  2 +-
 .../deployment/ApplicationReconcilerTest.java      | 42 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 284a178b..90defb2c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -166,7 +166,7 @@ public class ApplicationReconciler
             
Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
             LOG.info("Deleting deployment with terminated application before 
new deployment");
             flinkService.deleteClusterDeployment(
-                    relatedResource.getMetadata(), status, deployConfig, true);
+                    relatedResource.getMetadata(), status, deployConfig, 
!requireHaMetadata);
             flinkService.waitForClusterShutdown(deployConfig);
             statusRecorder.patchAndCacheStatus(relatedResource);
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 9a888182..86735d93 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
 import org.apache.flink.util.concurrent.Executors;
 
+import io.fabric8.kubernetes.api.model.DeletionPropagation;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -70,6 +72,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.junit.platform.commons.util.StringUtils;
 
 import java.time.Clock;
@@ -82,6 +85,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -975,6 +979,44 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         assertEquals(JobManagerDeploymentStatus.MISSING, 
status.getJobManagerDeploymentStatus());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testClusterCleanupBeforeDeploy(boolean requireMetadata) throws 
Exception {
+        var flinkApp = TestUtils.buildApplicationCluster();
+        var status = flinkApp.getStatus();
+        var spec = flinkApp.getSpec();
+        var deployConfig = 
configManager.getDeployConfig(flinkApp.getMetadata(), spec);
+
+        
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec, 
flinkApp);
+        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
+        var deleted = new AtomicBoolean(false);
+
+        flinkService =
+                new TestingFlinkService() {
+                    @Override
+                    protected void deleteClusterInternal(
+                            ObjectMeta meta,
+                            Configuration conf,
+                            boolean deleteHaMeta,
+                            DeletionPropagation deletionPropagation) {
+                        deleted.set(deleteHaMeta);
+                    }
+                };
+
+        reconciler
+                .getReconciler()
+                .deploy(
+                        getResourceContext(flinkApp),
+                        spec,
+                        deployConfig,
+                        Optional.empty(),
+                        requireMetadata);
+        assertEquals(deleted.get(), !requireMetadata);
+        assertEquals(JobManagerDeploymentStatus.DEPLOYING, 
status.getJobManagerDeploymentStatus());
+    }
+
     @Test
     public void testDeploymentRecoveryEvent() throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();

Reply via email to