This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b3979730 [FLINK-32904] Support cron expressions for periodic snapshots
triggering (#656)
b3979730 is described below
commit b39797300875b241598042834d96a20e7809adfc
Author: Alexander Fedulov <[email protected]>
AuthorDate: Tue Aug 29 19:04:06 2023 +0200
[FLINK-32904] Support cron expressions for periodic snapshots triggering
(#656)
---
.../shortcodes/generated/dynamic_section.html | 12 +-
.../kubernetes_operator_config_configuration.html | 12 +-
.../config/KubernetesOperatorConfigOptions.java | 30 ++-
.../operator/observer/SnapshotObserver.java | 4 +-
.../kubernetes/operator/utils/SnapshotUtils.java | 135 +++++++++--
.../flink/kubernetes/operator/TestUtils.java | 54 +++++
.../deployment/ApplicationReconcilerTest.java | 55 ++++-
.../operator/utils/SnapshotUtilsTest.java | 260 +++++++++++++++++----
8 files changed, 466 insertions(+), 96 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index e53052d8..21a03bcf 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -124,15 +124,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.checkpoint.interval</h5></td>
- <td style="word-wrap: break-word;">0 ms</td>
- <td>Duration</td>
- <td>Interval at which periodic checkpoints will be triggered. The
triggering schedule is not guaranteed, checkpoints will be triggered as part of
the regular reconcile loop. NOTE: checkpoints are generally managed by Flink.
This setting isn't meant to replace Flink's checkpoint settings, but to
complement them in special cases. For instance, a full checkpoint might need to
be occasionally triggered to break the chain of incremental checkpoints and
consolidate the partial incr [...]
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Option to enable automatic checkpoint triggering. Can be
specified either as a Duration type (i.e. '10m') or as a cron expression in
Quartz format (6 or 7 positions, see
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
triggering schedule is not guaranteed, checkpoints will be triggered as part
of the regular reconcile loop. NOTE: checkpoints are generally managed by
Flink. This setting isn't meant to replace Flink's checkpoint se [...]
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
- <td style="word-wrap: break-word;">0 ms</td>
- <td>Duration</td>
- <td>Interval at which periodic savepoints will be triggered. The
triggering schedule is not guaranteed, savepoints will be triggered as part of
the regular reconcile loop.</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Option to enable automatic savepoint triggering. Can be
specified either as a Duration type (i.e. '10m') or as a cron expression in
Quartz format (6 or 7 positions, see
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
triggering schedule is not guaranteed, savepoints will be triggered as part of
the regular reconcile loop. WARNING: not intended to be used together with the
cron-based periodic savepoint triggering</td>
</tr>
<tr>
<td><h5>kubernetes.operator.pod-template.merge-arrays-by-name</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 45ebb09e..2f3341e5 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -256,15 +256,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.checkpoint.interval</h5></td>
- <td style="word-wrap: break-word;">0 ms</td>
- <td>Duration</td>
- <td>Interval at which periodic checkpoints will be triggered. The
triggering schedule is not guaranteed, checkpoints will be triggered as part of
the regular reconcile loop. NOTE: checkpoints are generally managed by Flink.
This setting isn't meant to replace Flink's checkpoint settings, but to
complement them in special cases. For instance, a full checkpoint might need to
be occasionally triggered to break the chain of incremental checkpoints and
consolidate the partial incr [...]
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Option to enable automatic checkpoint triggering. Can be
specified either as a Duration type (i.e. '10m') or as a cron expression in
Quartz format (6 or 7 positions, see
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
triggering schedule is not guaranteed, checkpoints will be triggered as part
of the regular reconcile loop. NOTE: checkpoints are generally managed by
Flink. This setting isn't meant to replace Flink's checkpoint se [...]
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
- <td style="word-wrap: break-word;">0 ms</td>
- <td>Duration</td>
- <td>Interval at which periodic savepoints will be triggered. The
triggering schedule is not guaranteed, savepoints will be triggered as part of
the regular reconcile loop.</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Option to enable automatic savepoint triggering. Can be
specified either as a Duration type (i.e. '10m') or as a cron expression in
Quartz format (6 or 7 positions, see
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
triggering schedule is not guaranteed, savepoints will be triggered as part of
the regular reconcile loop. WARNING: not intended to be used together with the
cron-based periodic savepoint triggering</td>
</tr>
<tr>
<td><h5>kubernetes.operator.pod-template.merge-arrays-by-name</h5></td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 8226ce0d..e7c9112a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -298,22 +298,30 @@ public class KubernetesOperatorConfigOptions {
+ "Expected format:
headerKey1:headerValue1,headerKey2:headerValue2.");
@Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+ public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
operatorConfig("periodic.savepoint.interval")
- .durationType()
- .defaultValue(Duration.ZERO)
+ .stringType()
+ .defaultValue("")
.withDescription(
- "Interval at which periodic savepoints will be
triggered. "
+ "Option to enable automatic savepoint triggering.
Can be specified "
+ + "either as a Duration type (i.e. '10m')
or as a cron expression "
+ + "in Quartz format (6 or 7 positions, see
"
+ +
"http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html)."
+ "The triggering schedule is not
guaranteed, savepoints will be "
- + "triggered as part of the regular
reconcile loop.");
+ + "triggered as part of the regular
reconcile loop. "
+ + "WARNING: not intended to be used
together with the cron-based "
+ + "periodic savepoint triggering");
@Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Duration> PERIODIC_CHECKPOINT_INTERVAL =
+ public static final ConfigOption<String> PERIODIC_CHECKPOINT_INTERVAL =
operatorConfig("periodic.checkpoint.interval")
- .durationType()
- .defaultValue(Duration.ZERO)
+ .stringType()
+ .defaultValue("")
.withDescription(
- "Interval at which periodic checkpoints will be
triggered. "
+ "Option to enable automatic checkpoint triggering.
Can be specified "
+ + "either as a Duration type (i.e. '10m')
or as a cron expression "
+ + "in Quartz format (6 or 7 positions, see
"
+ +
"http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html)."
+ "The triggering schedule is not
guaranteed, checkpoints will "
+ "be triggered as part of the regular
reconcile loop. "
+ "NOTE: checkpoints are generally managed
by Flink. This "
@@ -321,7 +329,9 @@ public class KubernetesOperatorConfigOptions {
+ "but to complement them in special
cases. For instance, a "
+ "full checkpoint might need to be
occasionally triggered to "
+ "break the chain of incremental
checkpoints and consolidate "
- + "the partial incremental files.");
+ + "the partial incremental files. "
+ + "WARNING: not intended to be used
together with the cron-based "
+ + "periodic checkpoint triggering");
@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 54bcdfb6..3d3a68a2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -45,7 +45,7 @@ import java.util.List;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
-import static
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.isCheckpointsTriggeringSupported;
+import static
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.isSnapshotTriggeringSupported;
/** An observer of savepoint progress. */
public class SnapshotObserver<
@@ -81,7 +81,7 @@ public class SnapshotObserver<
}
public void observeCheckpointStatus(FlinkResourceContext<CR> ctx) {
- if (!isCheckpointsTriggeringSupported(ctx.getObserveConfig())) {
+ if (!isSnapshotTriggeringSupported(ctx.getObserveConfig())) {
return;
}
var resource = ctx.getResource();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 8b381476..4cc5ee2f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
@@ -31,11 +32,14 @@ import
org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.core.util.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
+import java.util.Date;
import java.util.Objects;
import java.util.Optional;
@@ -194,7 +198,7 @@ public class SnapshotUtils {
Long reconciledTriggerNonce;
boolean inProgress;
SnapshotInfo snapshotInfo;
- Duration interval;
+ String automaticTriggerExpression;
switch (snapshotType) {
case SAVEPOINT:
@@ -202,21 +206,16 @@ public class SnapshotUtils {
reconciledTriggerNonce =
reconciledJobSpec.getSavepointTriggerNonce();
inProgress = savepointInProgress(jobStatus);
snapshotInfo = jobStatus.getSavepointInfo();
- interval =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
+ automaticTriggerExpression =
+
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
break;
case CHECKPOINT:
triggerNonce = jobSpec.getCheckpointTriggerNonce();
reconciledTriggerNonce =
reconciledJobSpec.getCheckpointTriggerNonce();
inProgress = checkpointInProgress(jobStatus);
snapshotInfo = jobStatus.getCheckpointInfo();
- interval =
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
- if (!isCheckpointsTriggeringSupported(conf)) {
- LOG.warn(
- "Periodic checkpoints triggering every {} is
configured, "
- + "but not supported (requires Flink
1.17+)",
- interval);
- return Optional.empty();
- }
+ automaticTriggerExpression =
+
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type:
" + snapshotType);
@@ -229,33 +228,129 @@ public class SnapshotUtils {
var triggerNonceChanged =
triggerNonce != null &&
!triggerNonce.equals(reconciledTriggerNonce);
if (triggerNonceChanged) {
- return Optional.of(SnapshotTriggerType.MANUAL);
- }
-
- if (interval.isZero()) {
- return Optional.empty();
+ if (snapshotType == CHECKPOINT &&
!isSnapshotTriggeringSupported(conf)) {
+ LOG.warn(
+ "Manual checkpoint triggering is attempted, but is not
supported (requires Flink 1.17+)");
+ return Optional.empty();
+ } else {
+ return Optional.of(SnapshotTriggerType.MANUAL);
+ }
}
var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
-
// When the resource is first created/periodic snapshotting enabled we
have to compare
// against the creation timestamp for triggering the first periodic
savepoint
var lastTrigger =
lastTriggerTs == 0
?
Instant.parse(resource.getMetadata().getCreationTimestamp())
: Instant.ofEpochMilli(lastTriggerTs);
+
+ if (shouldTriggerAutomaticSnapshot(snapshotType,
automaticTriggerExpression, lastTrigger)) {
+ if (snapshotType == CHECKPOINT &&
!isSnapshotTriggeringSupported(conf)) {
+ LOG.warn(
+ "Automatic checkpoints triggering is configured but is
not supported (requires Flink 1.17+)");
+ return Optional.empty();
+ } else {
+ return Optional.of(SnapshotTriggerType.PERIODIC);
+ }
+ }
+ return Optional.empty();
+ }
+
+ @VisibleForTesting
+ static boolean shouldTriggerAutomaticSnapshot(
+ SnapshotType snapshotType, String automaticTriggerExpression,
Instant lastTrigger) {
+ if (StringUtils.isBlank(automaticTriggerExpression)) {
+ return false;
+ } // automaticTriggerExpression was configured by the user
+
+ Optional<Duration> interval =
interpretAsInterval(automaticTriggerExpression);
+ Optional<CronExpression> cron =
interpretAsCron(automaticTriggerExpression);
+
+ // This should never happen. The string cannot be both a valid
Duration and a cron
+ // expression at the same time.
+ if (interval.isPresent() && cron.isPresent()) {
+ LOG.error(
+ "Something went wrong with the automatic {} trigger
expression {}. This setting cannot be simultaneously a valid Duration and a
cron expression.",
+ snapshotType,
+ automaticTriggerExpression);
+ return false;
+ }
+
+ if (interval.isPresent()) {
+ return shouldTriggerIntervalBasedSnapshot(snapshotType,
interval.get(), lastTrigger);
+ } else if (cron.isPresent()) {
+ return shouldTriggerCronBasedSnapshot(
+ snapshotType, cron.get(), lastTrigger, Instant.now());
+ } else {
+ LOG.warn(
+ "Automatic {} triggering is configured, but the trigger
expression '{}' is neither a valid Duration, nor a cron expression.",
+ snapshotType,
+ automaticTriggerExpression);
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ static boolean shouldTriggerCronBasedSnapshot(
+ SnapshotType snapshotType,
+ CronExpression cronExpression,
+ Instant lastTriggerDateInstant,
+ Instant nowInstant) {
+ Date now = Date.from(nowInstant);
+ Date lastTrigger = Date.from(lastTriggerDateInstant);
+
+ Date nextValidTimeAfterLastTrigger =
cronExpression.getNextValidTimeAfter(lastTrigger);
+
+ if (nextValidTimeAfterLastTrigger != null &&
nextValidTimeAfterLastTrigger.before(now)) {
+ LOG.info(
+ "Triggering new automatic {} based on cron schedule '{}'
due at {}",
+ snapshotType.toString().toLowerCase(),
+ cronExpression.toString(),
+ nextValidTimeAfterLastTrigger);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ static boolean shouldTriggerIntervalBasedSnapshot(
+ SnapshotType snapshotType, Duration interval, Instant lastTrigger)
{
+ if (interval.isZero()) {
+ return false;
+ }
var now = Instant.now();
if (lastTrigger.plus(interval).isBefore(Instant.now())) {
LOG.info(
- "Triggering new periodic {} after {}",
+ "Triggering new automatic {} after {}",
snapshotType.toString().toLowerCase(),
Duration.between(lastTrigger, now));
- return Optional.of(SnapshotTriggerType.PERIODIC);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ static Optional<Duration> interpretAsInterval(String triggerExpression) {
+ try {
+ return
Optional.of(ConfigurationUtils.convertValue(triggerExpression, Duration.class));
+ } catch (Exception exception) {
+ return Optional.empty();
+ }
+ }
+
+ @VisibleForTesting
+ static Optional<CronExpression> interpretAsCron(String triggerExpression) {
+ try {
+ return Optional.of(new CronExpression(triggerExpression));
+ } catch (ParseException e) {
+ return Optional.empty();
}
- return Optional.empty();
}
- public static boolean isCheckpointsTriggeringSupported(Configuration conf)
{
+ public static boolean isSnapshotTriggeringSupported(Configuration conf) {
// Flink REST API supports triggering checkpoints externally starting
with 1.17
return conf.get(FLINK_VERSION) != null
&&
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index b29fa980..e228da2f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -23,10 +23,15 @@ import
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
+import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -66,6 +71,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -323,6 +329,54 @@ public class TestUtils extends BaseTestUtils {
return cr;
}
+ public static void reconcileSpec(FlinkDeployment deployment) {
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
+ }
+
+ /**
+ * Sets up an active cron trigger by ensuring that the latest successful
snapshot happened
+ * earlier than the scheduled trigger.
+ */
+ public static void setupCronTrigger(SnapshotType snapshotType,
FlinkDeployment deployment) {
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(2022, Calendar.JUNE, 5, 11, 0);
+ long lastCheckpointTimestamp = calendar.getTimeInMillis();
+
+ String cronOptionKey;
+
+ switch (snapshotType) {
+ case SAVEPOINT:
+ Savepoint lastSavepoint =
+ Savepoint.of("", lastCheckpointTimestamp,
SnapshotTriggerType.PERIODIC);
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .updateLastSavepoint(lastSavepoint);
+ cronOptionKey =
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key();
+ break;
+ case CHECKPOINT:
+ Checkpoint lastCheckpoint =
+ Checkpoint.of(lastCheckpointTimestamp,
SnapshotTriggerType.PERIODIC);
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getCheckpointInfo()
+ .updateLastCheckpoint(lastCheckpoint);
+ cronOptionKey =
KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported snapshot type:
" + snapshotType);
+ }
+
+ deployment.getSpec().getFlinkConfiguration().put(cronOptionKey, "0 0
12 5 6 ? 2022");
+ reconcileSpec(deployment);
+ }
+
/** Testing ResponseProvider. */
public static class ValidatingResponseProvider<T> implements
ResponseProvider<Object> {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 29ac7153..e5a1ab35 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -85,6 +85,7 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -360,7 +361,8 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
final BiConsumer<JobSpec, Long> setTriggerNonce;
final Function<JobSpec, Long> getTriggerNonce;
final Consumer<FlinkDeployment> updateLastSnapshot;
- final ConfigOption<Duration> periodicSnapshotInterval;
+ final BiConsumer<FlinkDeployment, Long> setLastSnapshotTime;
+ final ConfigOption<String> triggerSnapshotExpression;
final String triggerPrefix;
switch (snapshotType) {
@@ -378,7 +380,17 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
getJobSpec(flinkDeployment).getSavepointTriggerNonce());
savepointInfo.updateLastSavepoint(savepoint);
};
- periodicSnapshotInterval =
+ setLastSnapshotTime =
+ (flinkDeployment, timestamp) -> {
+ Savepoint lastSavepoint =
+ Savepoint.of("", timestamp,
SnapshotTriggerType.PERIODIC);
+ flinkDeployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .updateLastSavepoint(lastSavepoint);
+ };
+ triggerSnapshotExpression =
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
triggerPrefix = "savepoint_";
break;
@@ -397,7 +409,17 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
.getCheckpointTriggerNonce());
checkpointInfo.updateLastCheckpoint(checkpoint);
};
- periodicSnapshotInterval =
+ setLastSnapshotTime =
+ (flinkDeployment, timestamp) -> {
+ Checkpoint lastCheckpoint =
+ Checkpoint.of(timestamp,
SnapshotTriggerType.PERIODIC);
+ flinkDeployment
+ .getStatus()
+ .getJobStatus()
+ .getCheckpointInfo()
+ .updateLastCheckpoint(lastCheckpoint);
+ };
+ triggerSnapshotExpression =
KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
triggerPrefix = "checkpoint_";
break;
@@ -480,11 +502,34 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(snDeployment, context);
assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
- // trigger by periodic settings
-
snDeployment.getSpec().getFlinkConfiguration().put(periodicSnapshotInterval.key(),
"1");
+ // trigger by periodic interval settings
+
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(),
"1");
reconciler.reconcile(snDeployment, context);
assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
assertEquals(SnapshotStatus.PENDING,
getLastSnapshotStatus(snDeployment, snapshotType));
+
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(),
"0");
+
+ // trigger by cron expression
+ updateLastSnapshot.accept(snDeployment); // Ensures no snapshot is
considered to be running
+ assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
+ assertNotEquals(SnapshotStatus.PENDING,
getLastSnapshotStatus(snDeployment, snapshotType));
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(2022, Calendar.JUNE, 5, 11, 0);
+ setLastSnapshotTime.accept(
+ snDeployment, calendar.getTimeInMillis()); // Required for the
cron to trigger
+
+ snDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(triggerSnapshotExpression.key(), "0 0 12 5 6 ? 2022");
+ reconciler.reconcile(snDeployment, context);
+ assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
+ assertEquals(SnapshotStatus.PENDING,
getLastSnapshotStatus(snDeployment, snapshotType));
+ snDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(triggerSnapshotExpression.key(),
triggerSnapshotExpression.defaultValue());
}
@NotNull
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index d704c7c3..1c7e6b13 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -25,16 +26,26 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+import org.apache.logging.log4j.core.util.CronExpression;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
+import java.util.Calendar;
import java.util.Optional;
+import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
+import static org.apache.flink.kubernetes.operator.TestUtils.setupCronTrigger;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
+import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
+import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+import static
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.shouldTriggerAutomaticSnapshot;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link SnapshotUtils}. */
public class SnapshotUtilsTest {
@@ -43,37 +54,19 @@ public class SnapshotUtilsTest {
@Test
public void testSavepointTriggering() {
- SnapshotType snapshotType = SnapshotType.SAVEPOINT;
FlinkDeployment deployment = initDeployment(FlinkVersion.v1_15);
- reconcileSpec(deployment);
-
- assertEquals(
- Optional.empty(),
- SnapshotUtils.shouldTriggerSnapshot(
- deployment,
configManager.getObserveConfig(deployment), snapshotType));
-
- deployment
- .getSpec()
- .getFlinkConfiguration()
-
.put(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "10m");
- reconcileSpec(deployment);
-
- assertEquals(
- Optional.of(SnapshotTriggerType.PERIODIC),
- SnapshotUtils.shouldTriggerSnapshot(
- deployment,
configManager.getObserveConfig(deployment), snapshotType));
-
deployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+ testSnapshotTriggering(deployment, SAVEPOINT,
PERIODIC_SAVEPOINT_INTERVAL);
+ }
- deployment.getSpec().getJob().setSavepointTriggerNonce(123L);
- assertEquals(
- Optional.of(SnapshotTriggerType.MANUAL),
- SnapshotUtils.shouldTriggerSnapshot(
- deployment,
configManager.getObserveConfig(deployment), snapshotType));
+ @Test
+ public void testCheckpointTriggeringPost1_17() {
+ FlinkDeployment deployment = initDeployment(FlinkVersion.v1_17);
+ testSnapshotTriggering(deployment, CHECKPOINT,
PERIODIC_CHECKPOINT_INTERVAL);
}
@Test
public void testCheckpointTriggeringPre1_17() {
- SnapshotType snapshotType = SnapshotType.CHECKPOINT;
+ SnapshotType snapshotType = CHECKPOINT;
FlinkDeployment deployment = initDeployment(FlinkVersion.v1_16);
reconcileSpec(deployment);
@@ -83,31 +76,35 @@ public class SnapshotUtilsTest {
SnapshotUtils.shouldTriggerSnapshot(
deployment,
configManager.getObserveConfig(deployment), snapshotType));
- deployment
- .getSpec()
- .getFlinkConfiguration()
-
.put(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key(), "10m");
+
deployment.getSpec().getFlinkConfiguration().put(PERIODIC_CHECKPOINT_INTERVAL.key(),
"10m");
reconcileSpec(deployment);
assertEquals(
Optional.empty(),
SnapshotUtils.shouldTriggerSnapshot(
deployment,
configManager.getObserveConfig(deployment), snapshotType));
-
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+ resetTrigger(deployment, snapshotType);
- deployment.getSpec().getJob().setCheckpointTriggerNonce(123L);
+ setTriggerNonce(deployment, snapshotType, 123L);
assertEquals(
Optional.empty(),
SnapshotUtils.shouldTriggerSnapshot(
deployment,
configManager.getObserveConfig(deployment), snapshotType));
+ resetTrigger(deployment, snapshotType);
+
+ setupCronTrigger(snapshotType, deployment);
+ assertEquals(
+ Optional.empty(),
+ SnapshotUtils.shouldTriggerSnapshot(
+ deployment,
configManager.getObserveConfig(deployment), snapshotType));
+ resetTrigger(deployment, snapshotType);
}
- @Test
- public void testCheckpointTriggeringPost1_17() {
- SnapshotType snapshotType = SnapshotType.CHECKPOINT;
- FlinkDeployment deployment = initDeployment(FlinkVersion.v1_17);
+ private void testSnapshotTriggering(
+ FlinkDeployment deployment,
+ SnapshotType snapshotType,
+ ConfigOption<String> periodicSnapshotIntervalOption) {
reconcileSpec(deployment);
-
assertEquals(
Optional.empty(),
SnapshotUtils.shouldTriggerSnapshot(
@@ -116,20 +113,195 @@ public class SnapshotUtilsTest {
deployment
.getSpec()
.getFlinkConfiguration()
-
.put(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key(), "10m");
+ .put(periodicSnapshotIntervalOption.key(), "10m");
reconcileSpec(deployment);
assertEquals(
Optional.of(SnapshotTriggerType.PERIODIC),
SnapshotUtils.shouldTriggerSnapshot(
deployment,
configManager.getObserveConfig(deployment), snapshotType));
-
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+ resetTrigger(deployment, snapshotType);
+
deployment.getSpec().getFlinkConfiguration().put(periodicSnapshotIntervalOption.key(),
"0");
+ reconcileSpec(deployment);
- deployment.getSpec().getJob().setCheckpointTriggerNonce(123L);
+ setTriggerNonce(deployment, snapshotType, 123L);
assertEquals(
Optional.of(SnapshotTriggerType.MANUAL),
SnapshotUtils.shouldTriggerSnapshot(
deployment,
configManager.getObserveConfig(deployment), snapshotType));
+ resetTrigger(deployment, snapshotType);
+ reconcileSpec(deployment);
+
+ setupCronTrigger(snapshotType, deployment);
+ assertEquals(
+ Optional.of(SnapshotTriggerType.PERIODIC),
+ SnapshotUtils.shouldTriggerSnapshot(
+ deployment,
configManager.getObserveConfig(deployment), snapshotType));
+ }
+
+ @Test
+ public void testInterpretAsInterval_InvalidExpression() {
+ Optional<Duration> interval =
SnapshotUtils.interpretAsInterval("INVALID_DURATION");
+ assertTrue(interval.isEmpty());
+ }
+
+ @Test
+ public void testInterpretAsInterval_EmptyExpression() {
+ Optional<Duration> interval = SnapshotUtils.interpretAsInterval("");
+ assertTrue(interval.isEmpty());
+ }
+
+ @Test
+ public void
testShouldTriggerIntervalBasedSnapshot_ZeroDurationReturnsFalse() {
+ Duration interval = SnapshotUtils.interpretAsInterval("0").get();
+
+ assertFalse(
+ SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+ SnapshotType.CHECKPOINT, interval, Instant.now()));
+ }
+
+ @Test
+ public void
testShouldTriggerIntervalBasedSnapshot_NextValidTimeBeforeCurrent() {
+ Duration interval = SnapshotUtils.interpretAsInterval("10M").get();
+ Instant lastTrigger = Instant.now().minus(Duration.ofMinutes(5));
+ assertFalse(
+ SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+ SnapshotType.CHECKPOINT, interval, lastTrigger));
+ }
+
+ @Test
+ public void
testShouldTriggerIntervalBasedSnapshot_NextValidTimeAfterCurrent() {
+ Duration interval = SnapshotUtils.interpretAsInterval("10M").get();
+ Instant lastTrigger = Instant.now().minus(Duration.ofMinutes(11));
+ assertTrue(
+ SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+ SnapshotType.CHECKPOINT, interval, lastTrigger));
+ }
+
+ @Test
+ public void
testShouldTriggerCronBasedSnapshot_NextValidTimeBeforeCurrent() {
+ String cronExpressionString = "0 */10 * * * ?"; // Every 10th minute
+ CronExpression cronExpression =
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(2022, Calendar.JUNE, 5, 11, 5); // 11:05
+
+ Instant now = calendar.getTime().toInstant();
+ Instant lastTrigger =
+ now.minus(Duration.ofMinutes(10)); // 10:05, should have fired
at 11:00
+
+ boolean result =
+ SnapshotUtils.shouldTriggerCronBasedSnapshot(
+ CHECKPOINT, cronExpression, lastTrigger, now);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldTriggerCronBasedSnapshot_NextValidTimeAfterCurrent()
{
+ String cronExpressionString = "0 */10 * * * ?"; // Every 10th minute
+ CronExpression cronExpression =
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(2022, Calendar.JUNE, 5, 11, 5);
+
+ Instant now = calendar.getTime().toInstant(); // 11:05
+ Instant lastTrigger = now.minus(Duration.ofMinutes(4)); // 11:01, next
trigger at 11:10
+
+ boolean result =
+ SnapshotUtils.shouldTriggerCronBasedSnapshot(
+ CHECKPOINT, cronExpression, lastTrigger, now);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testShouldTriggerCronBasedSnapshot_NoNextValidTime() {
+ String cronExpressionString =
+ "0 0 0 29 2 ? 1999"; // An impossible time (Feb 29, 1999 was
not a leap year)
+ CronExpression cronExpression =
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+ Instant now = Instant.now();
+ Instant lastTrigger = now.minus(Duration.ofDays(365));
+
+ boolean result =
+ SnapshotUtils.shouldTriggerCronBasedSnapshot(
+ CHECKPOINT, cronExpression, lastTrigger, now);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testInterpretAsCron_InvalidCron() {
+ Optional<CronExpression> cronExpression =
SnapshotUtils.interpretAsCron("INVALID_CRON");
+
+ assertTrue(cronExpression.isEmpty());
+ }
+
+ @Test
+ public void testInterpretAsCron_EmptyCron() {
+ Optional<CronExpression> cronExpression =
SnapshotUtils.interpretAsCron("");
+
+ assertTrue(cronExpression.isEmpty());
+ }
+
+ @Test
+ public void shouldTriggerAutomaticSnapshot_EmptyExpression() {
+ boolean shouldTrigger =
+ shouldTriggerAutomaticSnapshot(
+ CHECKPOINT, "",
Instant.now().minus(Duration.ofDays(365)));
+ assertFalse(shouldTrigger);
+ }
+
+ @Test
+ public void shouldTriggerAutomaticSnapshot_InvalidExpression() {
+ boolean shouldTrigger =
+ shouldTriggerAutomaticSnapshot(
+ CHECKPOINT, "-1",
Instant.now().minus(Duration.ofDays(365)));
+ assertFalse(shouldTrigger);
+ }
+
+ @Test
+ public void shouldTriggerAutomaticSnapshot_ValidIntervalExpression() {
+ boolean shouldTrigger =
+ shouldTriggerAutomaticSnapshot(
+ CHECKPOINT, "10m",
Instant.now().minus(Duration.ofDays(365)));
+ assertTrue(shouldTrigger);
+ }
+
+ @Test
+ public void shouldTriggerAutomaticSnapshot_ValidCronExpression() {
+ boolean shouldTrigger =
+ shouldTriggerAutomaticSnapshot(
+ CHECKPOINT, "0 */10 * * * ?",
Instant.now().minus(Duration.ofDays(365)));
+ assertTrue(shouldTrigger);
+ }
+
+ private static void resetTrigger(FlinkDeployment deployment, SnapshotType
snapshotType) {
+ switch (snapshotType) {
+ case SAVEPOINT:
+
deployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+ break;
+ case CHECKPOINT:
+
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported snapshot type:
" + snapshotType);
+ }
+ }
+
+ private static void setTriggerNonce(
+ FlinkDeployment deployment, SnapshotType snapshotType, long nonce)
{
+ switch (snapshotType) {
+ case SAVEPOINT:
+ deployment.getSpec().getJob().setSavepointTriggerNonce(nonce);
+ break;
+ case CHECKPOINT:
+ deployment.getSpec().getJob().setCheckpointTriggerNonce(nonce);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported snapshot type:
" + snapshotType);
+ }
}
private static FlinkDeployment initDeployment(FlinkVersion flinkVersion) {
@@ -140,13 +312,7 @@ public class SnapshotUtilsTest {
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ reconcileSpec(deployment);
return deployment;
}
-
- private static void reconcileSpec(FlinkDeployment deployment) {
- deployment
- .getStatus()
- .getReconciliationStatus()
- .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- }
}