This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c8a6ca81 [FLINK-35493][snapshot] Add historical cleanup for
FlinkStateSnapshot CRs
c8a6ca81 is described below
commit c8a6ca814708dfc7b08b046ad20a8a64d53aa04a
Author: Mate Czagany <[email protected]>
AuthorDate: Tue Aug 20 12:38:04 2024 +0200
[FLINK-35493][snapshot] Add historical cleanup for FlinkStateSnapshot CRs
---
docs/content/docs/custom-resource/snapshots.md | 39 +-
.../shortcodes/generated/dynamic_section.html | 6 +-
.../kubernetes_operator_config_configuration.html | 10 +-
.../generated/system_advanced_section.html | 4 +-
.../flink/autoscaler/utils/DateTimeUtils.java | 10 +
.../config/FlinkOperatorConfiguration.java | 1 +
.../config/KubernetesOperatorConfigOptions.java | 74 ++--
.../controller/FlinkDeploymentController.java | 20 +-
.../controller/FlinkSessionJobController.java | 18 +-
.../operator/observer/SnapshotObserver.java | 248 ++++++++++--
.../operator/utils/EventSourceUtils.java | 37 ++
.../operator/utils/FlinkStateSnapshotUtils.java | 17 +
.../operator/utils/KubernetesClientUtils.java | 3 +-
...erTest.java => SnapshotObserverLegacyTest.java} | 154 +++++++-
.../operator/observer/SnapshotObserverTest.java | 435 +++++++++++----------
.../deployment/ApplicationObserverTest.java | 4 +
.../utils/FlinkStateSnapshotUtilsTest.java | 30 ++
17 files changed, 782 insertions(+), 328 deletions(-)
diff --git a/docs/content/docs/custom-resource/snapshots.md
b/docs/content/docs/custom-resource/snapshots.md
index 6c874c67..e0d39835 100644
--- a/docs/content/docs/custom-resource/snapshots.md
+++ b/docs/content/docs/custom-resource/snapshots.md
@@ -168,28 +168,43 @@ There is no guarantee on the timely execution of the
periodic snapshots as they
The operator automatically keeps track of the snapshot history triggered by
upgrade, manual and periodic snapshot operations.
This is necessary so cleanup can be performed by the operator for old
snapshots.
-Users can control the cleanup behaviour by specifying a maximum age and
maximum count for the savepoint and checkpoint resources in the history.
+{{< hint info >}}
+Snapshot cleanup happens lazily and only when the Flink resource associated
with the snapshot is running.
+It is therefore very likely that savepoints live beyond the max age
configuration.
+{{< /hint >}}
+
+#### Savepoints
+Users can control the cleanup behaviour by specifying maximum age and maximum
count for savepoints.
+If a max age is specified, FlinkStateSnapshot resources of savepoint type will
be cleaned up based on the `metadata.creationTimestamp` field.
+Snapshots will be cleaned up regardless of their status, but the operator will
always keep at least 1 completed FlinkStateSnapshot for every Flink job at all
time.
+
+Example configuration:
```
kubernetes.operator.savepoint.history.max.age: 24 h
kubernetes.operator.savepoint.history.max.count: 5
-
-kubernetes.operator.checkpoint.history.max.age: 24 h
-kubernetes.operator.checkpoint.history.max.count: 5
```
+To also dispose of savepoint data on savepoint cleanup, set
`kubernetes.operator.savepoint.dispose-on-delete: true`.
+This config will set `spec.savepoint.disposeOnDelete` to true for
FlinkStateSnapshot CRs created by upgrade, periodic and manual savepoints
created using `savepointTriggerNonce`.
+
+To disable automatic savepoint cleanup by the operator you can set
`kubernetes.operator.savepoint.cleanup.enabled: false`.
+
+#### Checkpoints
+
+FlinkStateSnapshots of checkpoint type will always be cleaned up. It's not
possible to set max age for them.
+The maxmimum amount of checkpoint resources retained will be deteremined by
the Flink configuration `state.checkpoints.num-retained`.
+
{{< hint warning >}}
-Checkpoint history history cleanup is only supported if FlinkStateSnapshot
resources are enabled.
+Checkpoint cleanup is only supported if FlinkStateSnapshot resources are
enabled.
This operation will only delete the FlinkStateSnapshot CR, and will never
delete any checkpoint data on the filesystem.
{{< /hint >}}
-{{< hint info >}}
-Savepoint cleanup happens lazily and only when the Flink resource associated
with the snapshot is running.
-It is therefore very likely that savepoints live beyond the max age
configuration.
-{{< /hint >}}
-To also dispose of savepoint data on savepoint cleanup, set
`kubernetes.operator.savepoint.dispose-on-delete: true`.
-This config will set `spec.savepoint.disposeOnDelete` to true for
FlinkStateSnapshot CRs created by periodic savepoints and manual ones created
using `savepointTriggerNonce`.
+### Snapshot History For Legacy Savepoints
-To disable savepoint/checkpoint cleanup by the operator you can set
`kubernetes.operator.savepoint.cleanup.enabled: false` and
`kubernetes.operator.checkpoint.cleanup.enabled: false`.
+Legacy savepoints found in FlinkDeployment/FlinkSessionJob CRs under the
deprecated `status.jobStatus.savepointInfo.savepointHistory` will be cleaned up:
+- For max age, it will be cleaned up when its trigger timestamp exceeds max age
+- For max count and FlinkStateSnapshot resources **disabled**, it will be
cleaned up when `savepointHistory` exceeds max count
+- For max count and FlinkStateSnapshot resources **enabled**, it will be
cleaned up when `savepointHistory` + number of FlinkStateSnapshot CRs related
to the job exceed max count
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index efb94f68..d60f0788 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -156,7 +156,7 @@
<td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
- <td>Whether to enable clean up of savepoint history.</td>
+ <td>Whether to enable clean up of savepoint FlinkStateSnapshot
resources. Savepoint state will be disposed of as well if the snapshot CR spec
is configured as such. For automatic savepoints this can be configured via the
kubernetes.operator.savepoint.dispose-on-delete config option.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
@@ -174,13 +174,13 @@
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">1 d</td>
<td>Duration</td>
- <td>Maximum age for savepoint history entries to retain. Due to
lazy clean-up, the most recent savepoint may live longer than the max age.</td>
+ <td>Maximum age for savepoint FlinkStateSnapshot resources to
retain. Due to lazy clean-up, the most recent savepoint may live longer than
the max age.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
- <td>Maximum number of savepoint history entries to retain.</td>
+ <td>Maximum number of savepoint FlinkStateSnapshot resources
entries to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 08fc6ac2..164b1889 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -354,7 +354,7 @@
<td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
- <td>Whether to enable clean up of savepoint history.</td>
+ <td>Whether to enable clean up of savepoint FlinkStateSnapshot
resources. Savepoint state will be disposed of as well if the snapshot CR spec
is configured as such. For automatic savepoints this can be configured via the
kubernetes.operator.savepoint.dispose-on-delete config option.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
@@ -372,25 +372,25 @@
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">1 d</td>
<td>Duration</td>
- <td>Maximum age for savepoint history entries to retain. Due to
lazy clean-up, the most recent savepoint may live longer than the max age.</td>
+ <td>Maximum age for savepoint FlinkStateSnapshot resources to
retain. Due to lazy clean-up, the most recent savepoint may live longer than
the max age.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>Maximum age threshold for savepoint history entries to
retain.</td>
+ <td>Maximum age threshold for FlinkStateSnapshot resources to
retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
- <td>Maximum number of savepoint history entries to retain.</td>
+ <td>Maximum number of savepoint FlinkStateSnapshot resources
entries to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>Maximum number threshold of savepoint history entries to
retain.</td>
+ <td>Maximum number threshold of savepoint FlinkStateSnapshot
resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html
b/docs/layouts/shortcodes/generated/system_advanced_section.html
index f24d9d9e..092604b4 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -84,13 +84,13 @@
<td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>Maximum age threshold for savepoint history entries to
retain.</td>
+ <td>Maximum age threshold for FlinkStateSnapshot resources to
retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>Maximum number threshold of savepoint history entries to
retain.</td>
+ <td>Maximum number threshold of savepoint FlinkStateSnapshot
resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.startup.stop-on-informer-error</h5></td>
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java
index 0b0aa4ac..06e55895 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java
@@ -60,4 +60,14 @@ public class DateTimeUtils {
ZonedDateTime dateTime = instant.atZone(ZoneId.systemDefault());
return dateTime.format(DateTimeFormatter.ISO_INSTANT);
}
+
+ /**
+ * Parses a Kubernetes-compatible datetime.
+ *
+ * @param datetime datetime in Kubernetes format
+ * @return time parsed
+ */
+ public static Instant parseKubernetes(String datetime) {
+ return Instant.parse(datetime);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 6482ff49..71711e54 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -115,6 +115,7 @@ public class FlinkOperatorConfiguration {
operatorConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);
+
Boolean exceptionStackTraceEnabled =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index af8d429c..5da5d3f8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -210,19 +210,46 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Whether to enable recovery of missing/deleted
jobmanager deployments.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean>
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+ operatorConfig("savepoint.dispose-on-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Savepoint data for FlinkStateSnapshot resources
created by the operator during upgrades and periodic savepoints will be
disposed of automatically when the generated Kubernetes resource is deleted.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<SavepointFormatType>
OPERATOR_SAVEPOINT_FORMAT_TYPE =
+ operatorConfig("savepoint.format.type")
+ .enumType(SavepointFormatType.class)
+ .defaultValue(SavepointFormatType.DEFAULT)
+ .withDescription(
+ "Type of the binary format in which a savepoint
should be taken.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<CheckpointType> OPERATOR_CHECKPOINT_TYPE =
+ operatorConfig("checkpoint.type")
+ .enumType(CheckpointType.class)
+ .defaultValue(CheckpointType.FULL)
+ .withDescription("Type of checkpoint.");
+
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean>
OPERATOR_SAVEPOINT_CLEANUP_ENABLED =
operatorConfig("savepoint.cleanup.enabled")
.booleanType()
.defaultValue(true)
- .withDescription("Whether to enable clean up of savepoint
history.");
+ .withDescription(
+ String.format(
+ "Whether to enable clean up of savepoint
FlinkStateSnapshot resources. Savepoint state will be disposed of as well if
the snapshot CR spec is configured as such. For automatic savepoints this can
be configured via the %s config option.",
+
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE.key()));
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Integer>
OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
operatorConfig("savepoint.history.max.count")
.intType()
.defaultValue(10)
- .withDescription("Maximum number of savepoint history
entries to retain.");
+ .withDescription(
+ "Maximum number of savepoint FlinkStateSnapshot
resources entries to retain.");
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer>
OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD =
@@ -230,7 +257,7 @@ public class KubernetesOperatorConfigOptions {
.intType()
.noDefaultValue()
.withDescription(
- "Maximum number threshold of savepoint history
entries to retain.");
+ "Maximum number threshold of savepoint
FlinkStateSnapshot resources to retain.");
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Duration>
OPERATOR_SAVEPOINT_HISTORY_MAX_AGE =
@@ -238,7 +265,15 @@ public class KubernetesOperatorConfigOptions {
.durationType()
.defaultValue(Duration.ofHours(24))
.withDescription(
- "Maximum age for savepoint history entries to
retain. Due to lazy clean-up, the most recent savepoint may live longer than
the max age.");
+ "Maximum age for savepoint FlinkStateSnapshot
resources to retain. Due to lazy clean-up, the most recent savepoint may live
longer than the max age.");
+
+ @Documentation.Section(SECTION_ADVANCED)
+ public static final ConfigOption<Duration>
OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
+ ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() +
".threshold")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum age threshold for FlinkStateSnapshot
resources to retain.");
@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption<Boolean>
OPERATOR_EXCEPTION_STACK_TRACE_ENABLED =
@@ -280,14 +315,6 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Key-Value pair where key is the REGEX to filter
through the exception messages and value is the string to be included in CR
status error label field if the REGEX matches. Expected format:
headerKey1:headerValue1,headerKey2:headerValue2.");
- @Documentation.Section(SECTION_ADVANCED)
- public static final ConfigOption<Duration>
OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
- ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() +
".threshold")
- .durationType()
- .noDefaultValue()
- .withDescription(
- "Maximum age threshold for savepoint history
entries to retain.");
-
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Map<String, String>>
JAR_ARTIFACT_HTTP_HEADER =
operatorConfig("user.artifacts.http.header")
@@ -438,29 +465,6 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Max allowed checkpoint age for initiating
last-state upgrades on running jobs. If a checkpoint is not available within
the desired age (and nothing in progress) a savepoint will be triggered.");
- @Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Boolean>
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
- operatorConfig("savepoint.dispose-on-delete")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Savepoint data for FlinkStateSnapshot resources
created by the operator during upgrades and periodic savepoints will be
disposed of automatically when the generated Kubernetes resource is deleted.");
-
- @Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<SavepointFormatType>
OPERATOR_SAVEPOINT_FORMAT_TYPE =
- operatorConfig("savepoint.format.type")
- .enumType(SavepointFormatType.class)
- .defaultValue(SavepointFormatType.DEFAULT)
- .withDescription(
- "Type of the binary format in which a savepoint
should be taken.");
-
- @Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<CheckpointType> OPERATOR_CHECKPOINT_TYPE =
- operatorConfig("checkpoint.type")
- .enumType(CheckpointType.class)
- .defaultValue(CheckpointType.FULL)
- .withDescription("Type of checkpoint.");
-
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
operatorConfig("health.probe.enabled")
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index b7f8afcc..aa9381fc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
@@ -31,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFact
import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -49,6 +51,8 @@ import
io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -197,9 +201,19 @@ public class FlinkDeploymentController
@Override
public Map<String, EventSource> prepareEventSources(
EventSourceContext<FlinkDeployment> context) {
- return EventSourceInitializer.nameEventSources(
- EventSourceUtils.getSessionJobInformerEventSource(context),
- EventSourceUtils.getDeploymentInformerEventSource(context));
+ List<EventSource> eventSources = new ArrayList<>();
+
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
+
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
+
+ if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
+ eventSources.add(
+
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
+ } else {
+ LOG.warn(
+ "Could not initialize informer for snapshots as the CRD
has not been installed!");
+ }
+
+ return
EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new));
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 3ef87cb7..3c08f007 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
@@ -28,6 +29,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -45,6 +47,8 @@ import
io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -160,8 +164,18 @@ public class FlinkSessionJobController
@Override
public Map<String, EventSource> prepareEventSources(
EventSourceContext<FlinkSessionJob> context) {
- return EventSourceInitializer.nameEventSources(
-
EventSourceUtils.getFlinkDeploymentInformerEventSource(context));
+ List<EventSource> eventSources = new ArrayList<>();
+
eventSources.add(EventSourceUtils.getFlinkDeploymentInformerEventSource(context));
+
+ if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
+ eventSources.add(
+
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
+ } else {
+ LOG.warn(
+ "Could not initialize informer for snapshots as the CRD
has not been installed!");
+ }
+
+ return
EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new));
}
private boolean validateSessionJob(FlinkResourceContext<FlinkSessionJob>
ctx) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 4327f91f..e09d1527 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -19,32 +19,49 @@ package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
-import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
+import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
-import java.util.Iterator;
-import java.util.List;
-
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
import static
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.isSnapshotTriggeringSupported;
@@ -54,6 +71,10 @@ public class SnapshotObserver<
CR extends AbstractFlinkResource<?, STATUS>, STATUS extends
CommonStatus<?>> {
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotObserver.class);
+ public static final Function<FlinkStateSnapshot, Instant>
EXTRACT_SNAPSHOT_TIME =
+ s ->
DateTimeUtils.parseKubernetes(s.getMetadata().getCreationTimestamp());
+ private static final Set<SnapshotTriggerType>
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES =
+ Set.of(SnapshotTriggerType.PERIODIC, SnapshotTriggerType.UPGRADE);
private final EventRecorder eventRecorder;
@@ -78,7 +99,7 @@ public class SnapshotObserver<
ctx.getFlinkService(), jobStatus, jobId,
ctx.getObserveConfig());
}
- cleanupSavepointHistory(ctx, jobStatus.getSavepointInfo());
+ cleanupSavepointHistory(ctx);
}
public void observeCheckpointStatus(FlinkResourceContext<CR> ctx) {
@@ -219,64 +240,213 @@ public class SnapshotObserver<
}
/** Clean up and dispose savepoints according to the configured max
size/age. */
+ private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
+ Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
+ if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
+ ctx.getOperatorConfig(), ctx.getObserveConfig())) {
+ snapshots =
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
+ if (snapshots == null) {
+ snapshots = Set.of();
+ }
+ }
+
+ cleanupSavepointHistoryLegacy(ctx, snapshots);
+
+ if (CollectionUtil.isNullOrEmpty(snapshots)) {
+ return;
+ }
+ if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) {
+ var savepointsToDelete =
+ getFlinkStateSnapshotsToCleanUp(
+ snapshots, ctx.getObserveConfig(),
ctx.getOperatorConfig(), SAVEPOINT);
+ var checkpointsToDelete =
+ getFlinkStateSnapshotsToCleanUp(
+ snapshots, ctx.getObserveConfig(),
ctx.getOperatorConfig(), CHECKPOINT);
+ Stream.concat(savepointsToDelete.stream(),
checkpointsToDelete.stream())
+ .forEach(
+ snapshot ->
+ ctx.getKubernetesClient()
+ .resource(snapshot)
+ .withTimeoutInMillis(0L)
+ .delete());
+ }
+ }
+
+ /**
+ * Returns a list of FlinkStateSnapshot resources that should be cleaned
up based on age/count
+ * policies.
+ *
+ * @param snapshots list of all snapshots
+ * @param observeConfig observe config
+ * @param operatorConfig operator config
+ * @param snapshotType checkpoint or savepoint
+ * @return set of FlinkStateSnapshot resources to delete
+ */
@VisibleForTesting
- void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo
currentSavepointInfo) {
+ Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
+ Collection<FlinkStateSnapshot> snapshots,
+ Configuration observeConfig,
+ FlinkOperatorConfiguration operatorConfig,
+ SnapshotType snapshotType) {
+ var snapshotList =
+ snapshots.stream()
+ .filter(s -> !s.isMarkedForDeletion())
+ .filter(
+ s ->
+
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
+
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
+ .filter(s -> (s.getSpec().isSavepoint() ==
(snapshotType == SAVEPOINT)))
+ .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+ .collect(Collectors.toList());
+
+ var lastCompleteSnapshot =
+ snapshotList.stream()
+ .filter(s ->
COMPLETED.equals(s.getStatus().getState()))
+ .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+ .orElse(null);
+
+ var maxCount = getMaxCountForSnapshotType(observeConfig,
operatorConfig, snapshotType);
+ var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig,
snapshotType);
+ var result = new HashSet<FlinkStateSnapshot>();
+
+ if (snapshotList.size() < 2) {
+ return result;
+ }
+
+ for (var snapshot : snapshotList) {
+ if (snapshot.equals(lastCompleteSnapshot)) {
+ continue;
+ }
+
+ // We should keep the last snapshot, even if not complete.
+ if (result.size() == snapshotList.size() - 1) {
+ break;
+ }
+
+ var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
+ if (snapshotList.size() - result.size() > maxCount || ts < maxTms)
{
+ result.add(snapshot);
+ }
+ }
- var observeConfig = ctx.getObserveConfig();
- var flinkService = ctx.getFlinkService();
- boolean savepointCleanupEnabled =
- observeConfig.getBoolean(
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
+ return result;
+ }
- // maintain history
- List<Savepoint> savepointHistory =
currentSavepointInfo.getSavepointHistory();
- if (savepointHistory.size() < 2) {
+ /**
+ * Cleans up the savepoint history of a Flink resource from the old,
deprecated
+ * savepoint-history. Secondary resources of FlinkStateSnapshot are used
to determine count of
+ * completed savepoints to be able to properly clean old savepoints based
on the count policy.
+ *
+ * @param ctx flink resource context
+ * @param allSecondarySnapshotResources all snapshot resources linked to
this Flink resource
+ */
+ @VisibleForTesting
+ void cleanupSavepointHistoryLegacy(
+ FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot>
allSecondarySnapshotResources) {
+ var maxTms =
+ getMinAgeForSnapshotType(
+ ctx.getObserveConfig(), ctx.getOperatorConfig(),
SAVEPOINT);
+ var maxCount =
+ getMaxCountForSnapshotType(
+ ctx.getObserveConfig(), ctx.getOperatorConfig(),
SAVEPOINT);
+
+ var completedSavepointCrs =
+ allSecondarySnapshotResources.stream()
+ .filter(
+ s ->
+ s.getStatus() != null
+ &&
COMPLETED.equals(s.getStatus().getState()))
+ .filter(s -> s.getSpec().isSavepoint())
+ .count();
+ maxCount = Math.max(0, maxCount - completedSavepointCrs);
+
+ var savepointHistory =
+ ctx.getResource()
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getSavepointHistory();
+
+ var savepointCleanupEnabled =
+ ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
+
+ // If we have a single new FlinkStateSnapshot CR, we can clean up the
last entry.
+ if (savepointHistory.isEmpty()
+ || (savepointHistory.size() == 1 && completedSavepointCrs ==
0)) {
return;
}
var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
- int maxCount =
- Math.max(
- 1,
- ConfigOptionUtils.getValueWithThreshold(
- observeConfig,
- KubernetesOperatorConfigOptions
- .OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
-
ctx.getOperatorConfig().getSavepointHistoryCountThreshold()));
while (savepointHistory.size() > maxCount) {
// remove oldest entries
- Savepoint sp = savepointHistory.remove(0);
+ var sp = savepointHistory.remove(0);
if (savepointCleanupEnabled) {
- disposeSavepointQuietly(flinkService, sp, observeConfig);
+ disposeSavepointQuietly(ctx, sp.getLocation());
}
}
- Duration maxAge =
- ConfigOptionUtils.getValueWithThreshold(
- observeConfig,
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
-
ctx.getOperatorConfig().getSavepointHistoryAgeThreshold());
- long maxTms = System.currentTimeMillis() - maxAge.toMillis();
- Iterator<Savepoint> it = savepointHistory.iterator();
+ var it = savepointHistory.iterator();
while (it.hasNext()) {
- Savepoint sp = it.next();
- if (sp.getTimeStamp() < maxTms && sp != lastSavepoint) {
+ var sp = it.next();
+ if (sp == lastSavepoint && completedSavepointCrs == 0) {
+ continue;
+ }
+ if (sp.getTimeStamp() < maxTms) {
it.remove();
if (savepointCleanupEnabled) {
- disposeSavepointQuietly(flinkService, sp, observeConfig);
+ disposeSavepointQuietly(ctx, sp.getLocation());
}
}
}
}
- private void disposeSavepointQuietly(
- FlinkService flinkService, Savepoint sp, Configuration conf) {
+ private void disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String
path) {
try {
- LOG.info("Disposing savepoint {}", sp);
- flinkService.disposeSavepoint(sp.getLocation(), conf);
+ LOG.info("Disposing savepoint {}", path);
+ ctx.getFlinkService().disposeSavepoint(path,
ctx.getObserveConfig());
} catch (Exception e) {
// savepoint dispose error should not affect the deployment
- LOG.error("Exception while disposing savepoint {}",
sp.getLocation(), e);
+ LOG.error("Exception while disposing savepoint {}", path, e);
+ }
+ }
+
+ private long getMinAgeForSnapshotType(
+ Configuration observeConfig,
+ FlinkOperatorConfiguration operatorConfig,
+ SnapshotType snapshotType) {
+ switch (snapshotType) {
+ case CHECKPOINT:
+ return 0;
+ case SAVEPOINT:
+ var maxAge =
+ ConfigOptionUtils.getValueWithThreshold(
+ observeConfig,
+ OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+
operatorConfig.getSavepointHistoryAgeThreshold());
+ return System.currentTimeMillis() - maxAge.toMillis();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown snapshot type %s",
snapshotType.name()));
+ }
+ }
+
+ private long getMaxCountForSnapshotType(
+ Configuration observeConfig,
+ FlinkOperatorConfiguration operatorConfig,
+ SnapshotType snapshotType) {
+ switch (snapshotType) {
+ case CHECKPOINT:
+ return Math.max(1,
observeConfig.get(MAX_RETAINED_CHECKPOINTS));
+ case SAVEPOINT:
+ return Math.max(
+ 1,
+ ConfigOptionUtils.getValueWithThreshold(
+ observeConfig,
+ OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
+
operatorConfig.getSavepointHistoryCountThreshold()));
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown snapshot type %s",
snapshotType.name()));
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index d5a5fc1a..018a080b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -18,10 +18,12 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.utils.Constants;
@@ -39,8 +41,10 @@ import
io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/** Utility class to locate secondary resources. */
public class EventSourceUtils {
@@ -49,6 +53,39 @@ public class EventSourceUtils {
private static final String FLINK_SESSIONJOB_IDX =
FlinkSessionJobController.class.getName();
private static final String FLINK_STATE_SNAPSHOT_IDX =
FlinkStateSnapshot.class.getName();
+ public static <T extends AbstractFlinkResource<?, ?>>
+ InformerEventSource<FlinkStateSnapshot, T>
+ getStateSnapshotForFlinkResourceInformerEventSource(
+ EventSourceContext<T> context) {
+ var labelFilters =
+ Stream.of(SnapshotTriggerType.PERIODIC,
SnapshotTriggerType.UPGRADE)
+ .map(Enum::name)
+ .collect(Collectors.joining(","));
+ var labelSelector =
+ String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TYPE,
labelFilters);
+ var configuration =
+ InformerConfiguration.from(FlinkStateSnapshot.class, context)
+ .withLabelSelector(labelSelector)
+ .withSecondaryToPrimaryMapper(
+ snapshot -> {
+ var jobRef =
snapshot.getSpec().getJobReference();
+ if (jobRef == null || jobRef.getName() ==
null) {
+ return Collections.emptySet();
+ }
+ var namespace =
+
Optional.ofNullable(jobRef.getNamespace())
+
.orElse(snapshot.getMetadata().getNamespace());
+ return Set.of(
+ new ResourceID(
+
snapshot.getSpec().getJobReference().getName(),
+ namespace));
+ })
+ .withNamespacesInheritedFromController(context)
+ .followNamespaceChanges(true)
+ .build();
+ return new InformerEventSource<>(configuration, context);
+ }
+
public static InformerEventSource<Deployment, FlinkDeployment>
getDeploymentInformerEventSource(
EventSourceContext<FlinkDeployment> context) {
final String labelSelector =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
index 42927738..a14b2b5c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
@@ -138,6 +138,23 @@ public class FlinkStateSnapshotUtils {
return kubernetesClient.resource(snapshot).create();
}
+ /**
+ * Extracts snapshot trigger type from a snapshot resource. If unable to
do so, return {@link
+ * SnapshotTriggerType#UNKNOWN}
+ *
+ * @param snapshot resource to check
+ * @return trigger type
+ */
+ public static SnapshotTriggerType
getSnapshotTriggerType(FlinkStateSnapshot snapshot) {
+ var triggerTypeStr =
+
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE);
+ try {
+ return SnapshotTriggerType.valueOf(triggerTypeStr);
+ } catch (NullPointerException | IllegalArgumentException e) {
+ return SnapshotTriggerType.UNKNOWN;
+ }
+ }
+
/**
* Creates a checkpoint {@link FlinkStateSnapshot} resource on the
Kubernetes cluster.
*
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
index 30faa879..cc4bcc7e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -71,6 +71,7 @@ public class KubernetesClientUtils {
/**
* Checks if the class for a Custom Resource is installed in the current
Kubernetes cluster.
+ * TODO: remove method when FlinkStateSnapshot CRD is made mandatory
*
* @param clazz class of Custom Resource
* @return true if the CRD present in the Kubernetes cluster
@@ -80,7 +81,7 @@ public class KubernetesClientUtils {
client.resources(clazz).list().getItems();
return true;
} catch (Throwable t) {
- LOG.warn("Failed to find CRD {}", clazz.getSimpleName());
+ LOG.debug("Could not find CRD {}", clazz.getSimpleName());
return false;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
similarity index 63%
copy from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
copy to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index 7ec815a2..fdbfb877 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
-import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
@@ -40,9 +42,11 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -50,7 +54,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link SnapshotObserver}. */
@EnableKubernetesMockClient(crud = true)
-public class SnapshotObserverTest extends OperatorTestBase {
+public class SnapshotObserverLegacyTest extends OperatorTestBase {
@Getter private KubernetesClient kubernetesClient;
private SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus> observer;
@@ -62,25 +66,137 @@ public class SnapshotObserverTest extends OperatorTestBase
{
@Test
public void testBasicObserve() {
+ Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
+ configManager.updateDefaultConfig(conf);
+
var deployment = TestUtils.buildApplicationCluster();
deployment
.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- SavepointInfo spInfo = new SavepointInfo();
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
Assertions.assertTrue(spInfo.getSavepointHistory().isEmpty());
Savepoint sp =
new Savepoint(
1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
spInfo.updateLastSavepoint(sp);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
Assertions.assertNotNull(spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
Collections.singletonList(sp), spInfo.getSavepointHistory());
}
+ @Test
+ public void testCountBasedDisposeWithSnapshotResources() {
+ var cr = TestUtils.buildSessionCluster();
+ cr.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(cr.getSpec(), cr);
+ Configuration conf = new Configuration();
+
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
2);
+ configManager.updateDefaultConfig(conf);
+
+ var spInfo = cr.getStatus().getJobStatus().getSavepointInfo();
+ for (var i = 0; i < 5; i++) {
+ var sp =
+ new Savepoint(
+ System.currentTimeMillis() * 2,
+ String.format("sp%d", i),
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ (long) i);
+ spInfo.updateLastSavepoint(sp);
+ }
+
+ assertThat(spInfo.getSavepointHistory()).hasSize(5);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of());
+ assertThat(spInfo.getSavepointHistory()).hasSize(2);
+
+ var snapshot1 =
+ BaseTestUtils.buildFlinkStateSnapshotSavepoint(
+ "cr_sp1",
+ cr.getMetadata().getNamespace(),
+ "cr_sp1",
+ false,
+ JobReference.fromFlinkResource(cr));
+ var snapshot2 =
+ BaseTestUtils.buildFlinkStateSnapshotSavepoint(
+ "cr_sp2",
+ cr.getMetadata().getNamespace(),
+ "cr_sp2",
+ false,
+ JobReference.fromFlinkResource(cr));
+ snapshot1.setStatus(new FlinkStateSnapshotStatus());
+
snapshot1.getStatus().setState(FlinkStateSnapshotStatus.State.IN_PROGRESS);
+
+ observer.cleanupSavepointHistoryLegacy(
+ getResourceContext(cr), Set.of(snapshot1, snapshot2));
+ assertThat(spInfo.getSavepointHistory()).hasSize(2);
+
+
snapshot1.getStatus().setState(FlinkStateSnapshotStatus.State.COMPLETED);
+
+ observer.cleanupSavepointHistoryLegacy(
+ getResourceContext(cr), Set.of(snapshot1, snapshot2));
+ assertThat(spInfo.getSavepointHistory()).hasSize(1);
+
+ snapshot2.setStatus(new FlinkStateSnapshotStatus());
+
snapshot2.getStatus().setState(FlinkStateSnapshotStatus.State.COMPLETED);
+
+ observer.cleanupSavepointHistoryLegacy(
+ getResourceContext(cr), Set.of(snapshot1, snapshot2));
+ assertThat(spInfo.getSavepointHistory()).hasSize(0);
+ }
+
+ @Test
+ public void testAgeBasedDisposeWithSnapshotResources() {
+ var cr = TestUtils.buildSessionCluster();
+ cr.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(cr.getSpec(), cr);
+ Configuration conf = new Configuration();
+ conf.set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(5));
+ configManager.updateDefaultConfig(conf);
+
+ var spInfo = cr.getStatus().getJobStatus().getSavepointInfo();
+ for (var i = 0; i < 5; i++) {
+ var sp =
+ new Savepoint(
+ i,
+ String.format("sp%d", i),
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ (long) i);
+ spInfo.updateLastSavepoint(sp);
+ }
+
+ assertThat(spInfo.getSavepointHistory()).hasSize(5);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of());
+ assertThat(spInfo.getSavepointHistory()).hasSize(1);
+
+ var snapshot1 =
+ BaseTestUtils.buildFlinkStateSnapshotSavepoint(
+ "cr_sp1",
+ cr.getMetadata().getNamespace(),
+ "cr_sp1",
+ false,
+ JobReference.fromFlinkResource(cr));
+ snapshot1.setStatus(new FlinkStateSnapshotStatus());
+
snapshot1.getStatus().setState(FlinkStateSnapshotStatus.State.IN_PROGRESS);
+
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of(snapshot1));
+ assertThat(spInfo.getSavepointHistory()).hasSize(1);
+
+
snapshot1.getStatus().setState(FlinkStateSnapshotStatus.State.COMPLETED);
+
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of(snapshot1));
+ assertThat(spInfo.getSavepointHistory()).hasSize(0);
+ }
+
@Test
public void testAgeBasedDispose() {
var cr = TestUtils.buildSessionCluster();
@@ -88,18 +204,19 @@ public class SnapshotObserverTest extends OperatorTestBase
{
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(cr.getSpec(), cr);
Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
conf.set(
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
Duration.ofMillis(5));
configManager.updateDefaultConfig(conf);
- SavepointInfo spInfo = new SavepointInfo();
+ var spInfo = cr.getStatus().getJobStatus().getSavepointInfo();
- Savepoint sp1 =
+ var sp1 =
new Savepoint(
1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(cr), spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of());
Assertions.assertIterableEquals(
Collections.singletonList(sp1), spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
@@ -109,7 +226,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
new Savepoint(
2, "sp2", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(cr), spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(cr),
Set.of());
Assertions.assertIterableEquals(
Collections.singletonList(sp2), spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
@@ -124,6 +241,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
conf.set(
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
Duration.ofMillis(System.currentTimeMillis() * 2));
@@ -131,23 +249,23 @@ public class SnapshotObserverTest extends
OperatorTestBase {
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
Duration.ofMillis(5));
configManager.updateDefaultConfig(conf);
- SavepointInfo spInfo = new SavepointInfo();
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
- Savepoint sp1 =
+ var sp1 =
new Savepoint(
1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
Assertions.assertIterableEquals(
Collections.singletonList(sp1), spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
Collections.emptyList(), flinkService.getDisposedSavepoints());
- Savepoint sp2 =
+ var sp2 =
new Savepoint(
2, "sp2", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
Assertions.assertIterableEquals(
Collections.singletonList(sp2), spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
@@ -164,6 +282,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED,
false);
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
1000);
conf.set(
@@ -172,7 +291,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
configManager.updateDefaultConfig(conf);
- SavepointInfo spInfo = new SavepointInfo();
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
Savepoint sp1 =
new Savepoint(
@@ -182,7 +301,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
SavepointFormatType.CANONICAL,
123L);
spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
Savepoint sp2 =
new Savepoint(
@@ -192,7 +311,7 @@ public class SnapshotObserverTest extends OperatorTestBase {
SavepointFormatType.CANONICAL,
123L);
spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
Assertions.assertIterableEquals(List.of(sp1, sp2),
spInfo.getSavepointHistory());
Assertions.assertIterableEquals(
Collections.emptyList(), flinkService.getDisposedSavepoints());
@@ -201,6 +320,9 @@ public class SnapshotObserverTest extends OperatorTestBase {
@Test
public void testPeriodicSavepoint() throws Exception {
var conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
+ configManager.updateDefaultConfig(conf);
+
var deployment = TestUtils.buildApplicationCluster();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
index 7ec815a2..eebac806 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java
@@ -17,36 +17,50 @@
package org.apache.flink.kubernetes.operator.observer;
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
-import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.api.status.Savepoint;
-import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
-import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
-import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
-import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
-import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import lombok.Getter;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
-import java.util.Collections;
+import java.time.Instant;
import java.util.List;
-
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
+import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
+import static
org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.MANUAL;
+import static
org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.PERIODIC;
+import static
org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.UPGRADE;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SnapshotObserver}. */
@EnableKubernetesMockClient(crud = true)
@@ -61,215 +75,216 @@ public class SnapshotObserverTest extends
OperatorTestBase {
}
@Test
- public void testBasicObserve() {
- var deployment = TestUtils.buildApplicationCluster();
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- SavepointInfo spInfo = new SavepointInfo();
- Assertions.assertTrue(spInfo.getSavepointHistory().isEmpty());
-
- Savepoint sp =
- new Savepoint(
- 1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
- spInfo.updateLastSavepoint(sp);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
-
- Assertions.assertNotNull(spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.singletonList(sp), spInfo.getSavepointHistory());
+ public void testSnapshotTypeCleanup() {
+ var conf = new
Configuration().set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 1);
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testData =
+ List.of(
+ createSnapshot(CHECKPOINT, 0, PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, 1, PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, 2, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 3, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 4, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 5, PERIODIC, COMPLETED));
+
+ var savepointResult =
+ observer.getFlinkStateSnapshotsToCleanUp(testData, conf,
operatorConfig, SAVEPOINT);
+ assertThat(savepointResult).containsExactlyInAnyOrder(testData.get(3),
testData.get(4));
+
+ var checkpointResult =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testData, conf, operatorConfig, CHECKPOINT);
+
assertThat(checkpointResult).containsExactlyInAnyOrder(testData.get(0),
testData.get(1));
}
@Test
- public void testAgeBasedDispose() {
- var cr = TestUtils.buildSessionCluster();
- cr.getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(cr.getSpec(), cr);
- Configuration conf = new Configuration();
- conf.set(
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
- Duration.ofMillis(5));
- configManager.updateDefaultConfig(conf);
-
- SavepointInfo spInfo = new SavepointInfo();
-
- Savepoint sp1 =
- new Savepoint(
- 1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
- spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(cr), spInfo);
- Assertions.assertIterableEquals(
- Collections.singletonList(sp1), spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.emptyList(), flinkService.getDisposedSavepoints());
-
- Savepoint sp2 =
- new Savepoint(
- 2, "sp2", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
- spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(cr), spInfo);
- Assertions.assertIterableEquals(
- Collections.singletonList(sp2), spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.singletonList(sp1.getLocation()),
flinkService.getDisposedSavepoints());
+ public void testSnapshotKeepOneCompleted() {
+ var conf = new
Configuration().set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 1);
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testData =
+ List.of(
+ createSnapshot(SAVEPOINT, 0, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 1, PERIODIC, FAILED),
+ createSnapshot(SAVEPOINT, 2, PERIODIC, FAILED),
+ createSnapshot(SAVEPOINT, 3, PERIODIC, FAILED));
+
+ var savepointResult =
+ observer.getFlinkStateSnapshotsToCleanUp(testData, conf,
operatorConfig, SAVEPOINT);
+ assertThat(savepointResult)
+ .containsExactlyInAnyOrder(testData.get(1), testData.get(2),
testData.get(3));
}
- @Test
- public void testAgeBasedDisposeWithAgeThreshold() {
- var deployment = TestUtils.buildApplicationCluster();
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- Configuration conf = new Configuration();
- conf.set(
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
- Duration.ofMillis(System.currentTimeMillis() * 2));
- conf.set(
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
- Duration.ofMillis(5));
- configManager.updateDefaultConfig(conf);
- SavepointInfo spInfo = new SavepointInfo();
-
- Savepoint sp1 =
- new Savepoint(
- 1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
- spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
- Assertions.assertIterableEquals(
- Collections.singletonList(sp1), spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.emptyList(), flinkService.getDisposedSavepoints());
-
- Savepoint sp2 =
- new Savepoint(
- 2, "sp2", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 123L);
- spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
- Assertions.assertIterableEquals(
- Collections.singletonList(sp2), spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.singletonList(sp1.getLocation()),
flinkService.getDisposedSavepoints());
-
- configManager.updateDefaultConfig(new Configuration());
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAgeBasedCleanupSavepoint(boolean setThreshold) {
+ var conf = new
Configuration().set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 10000);
+
+ if (setThreshold) {
+ conf.set(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
Duration.ofMillis(5));
+ conf.set(
+ OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(System.currentTimeMillis() * 2));
+ } else {
+ conf.set(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, Duration.ofMillis(5));
+ }
+
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testDataSavepoints =
+ List.of(
+ createSnapshot(SAVEPOINT, 0, MANUAL, COMPLETED),
+ createSnapshot(SAVEPOINT, 1, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 2, PERIODIC, ABANDONED),
+ createSnapshot(SAVEPOINT, 3, PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, 4, UPGRADE, COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE, PERIODIC,
COMPLETED));
+
+ var removedSavepoints =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testDataSavepoints, conf, operatorConfig, SAVEPOINT);
+ assertThat(removedSavepoints)
+ .containsExactlyInAnyOrder(
+ testDataSavepoints.get(1),
+ testDataSavepoints.get(2),
+ testDataSavepoints.get(3),
+ testDataSavepoints.get(4));
}
@Test
- public void testDisabledDispose() {
- var deployment = TestUtils.buildApplicationCluster();
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- Configuration conf = new Configuration();
-
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED,
false);
-
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
1000);
- conf.set(
-
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
- Duration.ofDays(100L));
-
- configManager.updateDefaultConfig(conf);
-
- SavepointInfo spInfo = new SavepointInfo();
-
- Savepoint sp1 =
- new Savepoint(
- 9999999999999998L,
- "sp1",
- SnapshotTriggerType.MANUAL,
- SavepointFormatType.CANONICAL,
- 123L);
- spInfo.updateLastSavepoint(sp1);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
-
- Savepoint sp2 =
- new Savepoint(
- 9999999999999999L,
- "sp2",
- SnapshotTriggerType.MANUAL,
- SavepointFormatType.CANONICAL,
- 123L);
- spInfo.updateLastSavepoint(sp2);
- observer.cleanupSavepointHistory(getResourceContext(deployment),
spInfo);
- Assertions.assertIterableEquals(List.of(sp1, sp2),
spInfo.getSavepointHistory());
- Assertions.assertIterableEquals(
- Collections.emptyList(), flinkService.getDisposedSavepoints());
+ public void testAgeBasedCleanupSavepointKeepOne() {
+ var conf =
+ new Configuration()
+ .set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 10000)
+ .set(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
Duration.ofMillis(5));
+
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testDataSavepoints =
+ List.of(
+ createSnapshot(SAVEPOINT, 0, PERIODIC, IN_PROGRESS),
+ createSnapshot(SAVEPOINT, 1, PERIODIC, IN_PROGRESS),
+ createSnapshot(SAVEPOINT, 2, PERIODIC, IN_PROGRESS),
+ createSnapshot(SAVEPOINT, 3, PERIODIC, IN_PROGRESS));
+
+ var removedSavepoints =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testDataSavepoints, conf, operatorConfig, SAVEPOINT);
+ assertThat(removedSavepoints)
+ .containsExactlyInAnyOrder(
+ testDataSavepoints.get(0),
+ testDataSavepoints.get(1),
+ testDataSavepoints.get(2));
}
@Test
- public void testPeriodicSavepoint() throws Exception {
+ public void testAgeBasedCleanupCheckpoint() {
+ var conf = new Configuration().set(MAX_RETAINED_CHECKPOINTS, 10000);
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testDataCheckpoints =
+ List.of(
+ createSnapshot(CHECKPOINT, 0, MANUAL, COMPLETED),
+ createSnapshot(CHECKPOINT, 1, PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, 2, PERIODIC, ABANDONED),
+ createSnapshot(CHECKPOINT, 3, PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, 4, UPGRADE, COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE, PERIODIC,
COMPLETED));
+
+ var removedCheckpoints =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testDataCheckpoints, conf, operatorConfig, CHECKPOINT);
+ assertThat(removedCheckpoints).isEmpty();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCountBasedCleanupSavepoint(boolean setThreshold) {
var conf = new Configuration();
- var deployment = TestUtils.buildApplicationCluster();
- var status = deployment.getStatus();
- var jobStatus = status.getJobStatus();
- status.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- jobStatus.setState("RUNNING");
-
- var savepointInfo = jobStatus.getSavepointInfo();
- flinkService.triggerSavepointLegacy(null,
SnapshotTriggerType.PERIODIC, deployment, conf);
-
- var triggerTs = savepointInfo.getTriggerTimestamp();
- assertEquals(0L, savepointInfo.getLastPeriodicSavepointTimestamp());
- assertEquals(SnapshotTriggerType.PERIODIC,
savepointInfo.getTriggerType());
- assertTrue(SnapshotUtils.savepointInProgress(jobStatus));
- assertEquals(
- SnapshotStatus.PENDING,
SnapshotUtils.getLastSnapshotStatus(deployment, SAVEPOINT));
- assertTrue(triggerTs > 0);
-
- // Pending
- observer.observeSavepointStatus(getResourceContext(deployment));
- // Completed
- observer.observeSavepointStatus(getResourceContext(deployment));
- assertEquals(triggerTs,
savepointInfo.getLastPeriodicSavepointTimestamp());
- assertFalse(SnapshotUtils.savepointInProgress(jobStatus));
- assertEquals(
- SnapshotUtils.getLastSnapshotStatus(deployment, SAVEPOINT),
- SnapshotStatus.SUCCEEDED);
- assertEquals(savepointInfo.getLastSavepoint(),
savepointInfo.getSavepointHistory().get(0));
- assertEquals(
- SnapshotTriggerType.PERIODIC,
savepointInfo.getLastSavepoint().getTriggerType());
- assertNull(savepointInfo.getLastSavepoint().getTriggerNonce());
+
+ if (setThreshold) {
+ conf.set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD, 2);
+ conf.set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 10000);
+ } else {
+ conf.set(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 2);
+ }
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testDataSavepoints =
+ List.of(
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 1, MANUAL,
COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 2,
PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 3,
PERIODIC, ABANDONED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 4, UPGRADE,
COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 5,
PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 6,
PERIODIC, COMPLETED),
+ createSnapshot(SAVEPOINT, Long.MAX_VALUE - 7,
PERIODIC, COMPLETED));
+
+ var removedSavepoints =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testDataSavepoints, conf, operatorConfig, SAVEPOINT);
+ assertThat(removedSavepoints)
+ .containsExactlyInAnyOrder(
+ testDataSavepoints.get(6),
+ testDataSavepoints.get(5),
+ testDataSavepoints.get(4),
+ testDataSavepoints.get(3));
}
@Test
- public void testPeriodicCheckpoint() {
- var conf = new Configuration();
- var deployment = TestUtils.buildApplicationCluster();
- var status = deployment.getStatus();
- var jobStatus = status.getJobStatus();
- status.getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- jobStatus.setState("RUNNING");
-
- var checkpointInfo = jobStatus.getCheckpointInfo();
- var triggerId = flinkService.triggerCheckpoint(null,
CheckpointType.FULL, conf);
- checkpointInfo.setTrigger(
- triggerId,
- SnapshotTriggerType.PERIODIC,
-
org.apache.flink.kubernetes.operator.api.status.CheckpointType.FULL);
-
- var triggerTs = checkpointInfo.getTriggerTimestamp();
- assertEquals(0L, checkpointInfo.getLastPeriodicTriggerTimestamp());
- assertTrue(SnapshotUtils.checkpointInProgress(jobStatus));
- assertEquals(
- SnapshotStatus.PENDING,
- SnapshotUtils.getLastSnapshotStatus(deployment, CHECKPOINT));
- assertTrue(triggerTs > 0);
-
- // Pending
- observer.observeCheckpointStatus(getResourceContext(deployment));
- // Completed
- observer.observeCheckpointStatus(getResourceContext(deployment));
- assertEquals(triggerTs,
checkpointInfo.getLastPeriodicTriggerTimestamp());
- assertFalse(SnapshotUtils.checkpointInProgress(jobStatus));
- assertEquals(
- SnapshotUtils.getLastSnapshotStatus(deployment, CHECKPOINT),
- SnapshotStatus.SUCCEEDED);
- assertEquals(
- SnapshotTriggerType.PERIODIC,
checkpointInfo.getLastCheckpoint().getTriggerType());
- assertNull(checkpointInfo.getLastCheckpoint().getTriggerNonce());
+ public void testCountBasedCleanupCheckpoint() {
+ var conf = new Configuration().set(MAX_RETAINED_CHECKPOINTS, 2);
+ var operatorConfig =
FlinkOperatorConfiguration.fromConfiguration(conf);
+
+ var testDataCheckpoints =
+ List.of(
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 1, MANUAL,
COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 2,
PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 3,
PERIODIC, ABANDONED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 4,
UPGRADE, COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 5,
PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 6,
PERIODIC, COMPLETED),
+ createSnapshot(CHECKPOINT, Long.MAX_VALUE - 7,
PERIODIC, COMPLETED));
+
+ var removedCheckpoints =
+ observer.getFlinkStateSnapshotsToCleanUp(
+ testDataCheckpoints, conf, operatorConfig, CHECKPOINT);
+ assertThat(removedCheckpoints)
+ .containsExactlyInAnyOrder(
+ testDataCheckpoints.get(6),
+ testDataCheckpoints.get(5),
+ testDataCheckpoints.get(4),
+ testDataCheckpoints.get(3));
+ }
+
+ private static FlinkStateSnapshot createSnapshot(
+ SnapshotType snapshotType,
+ long timestamp,
+ SnapshotTriggerType triggerType,
+ FlinkStateSnapshotStatus.State snapshotState) {
+ var metadata =
+ new ObjectMetaBuilder()
+ .withName(UUID.randomUUID().toString())
+ .withLabels(Map.of(CrdConstants.LABEL_SNAPSHOT_TYPE,
triggerType.name()))
+ .withCreationTimestamp(
+
DateTimeUtils.kubernetes(Instant.ofEpochMilli(timestamp)))
+ .build();
+
+ var spec = new FlinkStateSnapshotSpec();
+ if (snapshotType == SAVEPOINT) {
+ spec.setSavepoint(new SavepointSpec());
+ } else if (snapshotType == CHECKPOINT) {
+ spec.setCheckpoint(new CheckpointSpec());
+ }
+
+ var status =
FlinkStateSnapshotStatus.builder().state(snapshotState).build();
+
+ var snapshot = new FlinkStateSnapshot();
+ snapshot.setMetadata(metadata);
+ snapshot.setSpec(spec);
+ snapshot.setStatus(status);
+
+ return snapshot;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index f71197f2..47805ce4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -241,6 +241,10 @@ public class ApplicationObserverTest extends
OperatorTestBase {
deployment.getSpec().getJob().setSavepointTriggerNonce(timedOutNonce);
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(),
deployment.getSpec());
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+
.put(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED.key(), "false");
flinkService.submitApplicationCluster(deployment.getSpec().getJob(),
conf, false);
bringToReadyStatus(deployment);
assertTrue(ReconciliationUtils.isJobRunning(deployment.getStatus()));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index bf37ed63..fd136969 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -68,6 +68,36 @@ public class FlinkStateSnapshotUtilsTest {
private static final String SAVEPOINT_NAME = "savepoint-01";
private static final String SAVEPOINT_PATH = "/tmp/savepoint-01";
+ @Test
+ public void testGetSnapshotTriggerType() {
+ var snapshot = new FlinkStateSnapshot();
+
+ assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot))
+ .isEqualTo(SnapshotTriggerType.UNKNOWN);
+
+
snapshot.getMetadata().getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, "");
+ assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot))
+ .isEqualTo(SnapshotTriggerType.UNKNOWN);
+
+ snapshot.getMetadata()
+ .getLabels()
+ .put(CrdConstants.LABEL_SNAPSHOT_TYPE,
SnapshotTriggerType.MANUAL.name());
+ assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot))
+ .isEqualTo(SnapshotTriggerType.MANUAL);
+
+ snapshot.getMetadata()
+ .getLabels()
+ .put(CrdConstants.LABEL_SNAPSHOT_TYPE,
SnapshotTriggerType.UPGRADE.name());
+ assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot))
+ .isEqualTo(SnapshotTriggerType.UPGRADE);
+
+ snapshot.getMetadata()
+ .getLabels()
+ .put(CrdConstants.LABEL_SNAPSHOT_TYPE,
SnapshotTriggerType.PERIODIC.name());
+ assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot))
+ .isEqualTo(SnapshotTriggerType.PERIODIC);
+ }
+
@Test
public void testGetValidatedFlinkStateSnapshotPathPathGiven() {
var snapshotRef =
FlinkStateSnapshotReference.builder().path(SAVEPOINT_PATH).build();