This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 3f0dc2ee [FLINK-33011] Never accidentally delete HA metadata for last
state deployments
3f0dc2ee is described below
commit 3f0dc2ee5534084bc162e6deaded36e93bb5e384
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Sep 14 15:29:58 2023 +0200
[FLINK-33011] Never accidentally delete HA metadata for last state
deployments
---
.../deployment/ApplicationReconciler.java | 2 +-
.../deployment/ApplicationReconcilerTest.java | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 284a178b..90defb2c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -166,7 +166,7 @@ public class ApplicationReconciler
Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
LOG.info("Deleting deployment with terminated application before
new deployment");
flinkService.deleteClusterDeployment(
- relatedResource.getMetadata(), status, deployConfig, true);
+ relatedResource.getMetadata(), status, deployConfig,
!requireHaMetadata);
flinkService.waitForClusterShutdown(deployConfig);
statusRecorder.patchAndCacheStatus(relatedResource);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 9a888182..86735d93 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -31,6 +31,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.util.concurrent.Executors;
+import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -70,6 +72,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;
import java.time.Clock;
@@ -82,6 +85,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -975,6 +979,44 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertEquals(JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusterCleanupBeforeDeploy(boolean requireMetadata) throws
Exception {
+ var flinkApp = TestUtils.buildApplicationCluster();
+ var status = flinkApp.getStatus();
+ var spec = flinkApp.getSpec();
+ var deployConfig =
configManager.getDeployConfig(flinkApp.getMetadata(), spec);
+
+
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec,
flinkApp);
+ status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
+ var deleted = new AtomicBoolean(false);
+
+ flinkService =
+ new TestingFlinkService() {
+ @Override
+ protected void deleteClusterInternal(
+ ObjectMeta meta,
+ Configuration conf,
+ boolean deleteHaMeta,
+ DeletionPropagation deletionPropagation) {
+ deleted.set(deleteHaMeta);
+ }
+ };
+
+ reconciler
+ .getReconciler()
+ .deploy(
+ getResourceContext(flinkApp),
+ spec,
+ deployConfig,
+ Optional.empty(),
+ requireMetadata);
+ assertEquals(deleted.get(), !requireMetadata);
+ assertEquals(JobManagerDeploymentStatus.DEPLOYING,
status.getJobManagerDeploymentStatus());
+ }
+
@Test
public void testDeploymentRecoveryEvent() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();