This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d10918c6 [FLINK-35265][snapshot] Create FlinkStateSnapshot in the
namespace of job
d10918c6 is described below
commit d10918c626c19964635a340fd99a627691fedf54
Author: Mate Czagany <[email protected]>
AuthorDate: Sat Aug 10 17:08:40 2024 +0200
[FLINK-35265][snapshot] Create FlinkStateSnapshot in the namespace of job
---
.../operator/utils/FlinkStateSnapshotUtils.java | 16 ++++++++++++++--
.../flink/kubernetes/operator/TestUtils.java | 7 ++++++-
.../utils/FlinkStateSnapshotUtilsTest.java | 22 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
index c74776d9..42927738 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
@@ -122,10 +122,12 @@ public class FlinkStateSnapshotUtils {
protected static FlinkStateSnapshot createFlinkStateSnapshot(
KubernetesClient kubernetesClient,
+ String namespace,
String name,
FlinkStateSnapshotSpec spec,
SnapshotTriggerType triggerType) {
var metadata = new ObjectMeta();
+ metadata.setNamespace(namespace);
metadata.setName(name);
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE,
triggerType.name());
@@ -169,7 +171,12 @@ public class FlinkStateSnapshotUtils {
.build();
var resourceName = getFlinkStateSnapshotName(SAVEPOINT, triggerType,
resource);
- return createFlinkStateSnapshot(kubernetesClient, resourceName,
snapshotSpec, triggerType);
+ return createFlinkStateSnapshot(
+ kubernetesClient,
+ resource.getMetadata().getNamespace(),
+ resourceName,
+ snapshotSpec,
+ triggerType);
}
/**
@@ -191,7 +198,12 @@ public class FlinkStateSnapshotUtils {
.build();
var resourceName = getFlinkStateSnapshotName(CHECKPOINT, triggerType,
resource);
- return createFlinkStateSnapshot(kubernetesClient, resourceName,
snapshotSpec, triggerType);
+ return createFlinkStateSnapshot(
+ kubernetesClient,
+ resource.getMetadata().getNamespace(),
+ resourceName,
+ snapshotSpec,
+ triggerType);
}
/**
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index e450c236..81b4e7e6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -286,7 +286,12 @@ public class TestUtils extends BaseTestUtils {
public static <CR extends AbstractFlinkResource<?, ?>>
List<FlinkStateSnapshot> getFlinkStateSnapshotsForResource(
KubernetesClient kubernetesClient, CR resource) {
- return
kubernetesClient.resources(FlinkStateSnapshot.class).list().getItems().stream()
+ return kubernetesClient
+ .resources(FlinkStateSnapshot.class)
+ .inAnyNamespace()
+ .list()
+ .getItems()
+ .stream()
.filter(
s ->
s.getSpec()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index 7f658d8d..bf37ed63 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -229,6 +229,28 @@ public class FlinkStateSnapshotUtilsTest {
false);
}
+ @Test
+ public void testCreateSnapshotInSameNamespace() {
+ var namespace = "different-namespace";
+ var deployment = initDeployment();
+ deployment.getMetadata().setNamespace(namespace);
+
+ var savepoint =
+ FlinkStateSnapshotUtils.createSavepointResource(
+ client,
+ deployment,
+ SAVEPOINT_PATH,
+ SnapshotTriggerType.PERIODIC,
+ SavepointFormatType.CANONICAL,
+ true);
+
assertThat(savepoint.getMetadata().getNamespace()).isEqualTo(namespace);
+
+ var checkpoint =
+ FlinkStateSnapshotUtils.createCheckpointResource(
+ client, deployment, SnapshotTriggerType.MANUAL);
+
assertThat(checkpoint.getMetadata().getNamespace()).isEqualTo(namespace);
+ }
+
@Test
public void testCreateCheckpointResource() {
var deployment = initDeployment();