This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 7db0eb1e [FLINK-30004] Cleanup deployment after savepoint for Flink
versions < 1.15
7db0eb1e is described below
commit 7db0eb1e02a1430128f436bdecea9748ce81fe14
Author: Thomas Weise <[email protected]>
AuthorDate: Sun Nov 13 15:56:41 2022 -0500
[FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15
---
.../operator/service/AbstractFlinkService.java | 1 +
.../operator/service/NativeFlinkService.java | 7 ++++++-
.../operator/service/NativeFlinkServiceTest.java | 19 +++++++++++++++++--
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 38b32d04..2208cf80 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -326,6 +326,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
exception);
}
if (deleteClusterAfterSavepoint) {
+ LOG.info("Cleaning up deployment after
stop-with-savepoint");
deleteClusterDeployment(deployment.getMetadata(),
deploymentStatus, true);
}
break;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index f40322ff..1402c8d3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -27,6 +27,7 @@ import
org.apache.flink.client.deployment.application.ApplicationConfiguration;
import
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
@@ -80,7 +81,11 @@ public class NativeFlinkService extends AbstractFlinkService
{
public void cancelJob(
FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration
configuration)
throws Exception {
- cancelJob(deployment, upgradeMode, configuration, false);
+ // prior to Flink 1.15, ensure removal of orphaned config maps
+ // https://issues.apache.org/jira/browse/FLINK-30004
+ boolean deleteClusterAfterSavepoint =
+
!deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
+ cancelJob(deployment, upgradeMode, configuration,
deleteClusterAfterSavepoint);
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c79663ef..fb204c1a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -56,6 +56,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -114,8 +116,9 @@ public class NativeFlinkServiceTest {
assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
}
- @Test
- public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+ @ParameterizedTest
+ @EnumSource(FlinkVersion.class)
+ public void testCancelJobWithSavepointUpgradeMode(FlinkVersion
flinkVersion) throws Exception {
final TestingClusterClient<String> testingClusterClient =
new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
final CompletableFuture<Tuple3<JobID, Boolean, String>>
stopWithSavepointFuture =
@@ -143,6 +146,7 @@ public class NativeFlinkServiceTest {
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+ deployment.getSpec().setFlinkVersion(flinkVersion);
flinkService.cancelJob(
deployment, UpgradeMode.SAVEPOINT,
configManager.getObserveConfig(deployment));
assertTrue(stopWithSavepointFuture.isDone());
@@ -150,6 +154,17 @@ public class NativeFlinkServiceTest {
assertFalse(stopWithSavepointFuture.get().f1);
assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
assertEquals(savepointPath,
jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
+
+ assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED.name());
+ if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
+ assertEquals(
+ deployment.getStatus().getJobManagerDeploymentStatus(),
+ JobManagerDeploymentStatus.READY);
+ } else {
+ assertEquals(
+ deployment.getStatus().getJobManagerDeploymentStatus(),
+ JobManagerDeploymentStatus.MISSING);
+ }
}
@Test