This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b3979730 [FLINK-32904] Support cron expressions for periodic snapshots 
triggering (#656)
b3979730 is described below

commit b39797300875b241598042834d96a20e7809adfc
Author: Alexander Fedulov <[email protected]>
AuthorDate: Tue Aug 29 19:04:06 2023 +0200

    [FLINK-32904] Support cron expressions for periodic snapshots triggering 
(#656)
---
 .../shortcodes/generated/dynamic_section.html      |  12 +-
 .../kubernetes_operator_config_configuration.html  |  12 +-
 .../config/KubernetesOperatorConfigOptions.java    |  30 ++-
 .../operator/observer/SnapshotObserver.java        |   4 +-
 .../kubernetes/operator/utils/SnapshotUtils.java   | 135 +++++++++--
 .../flink/kubernetes/operator/TestUtils.java       |  54 +++++
 .../deployment/ApplicationReconcilerTest.java      |  55 ++++-
 .../operator/utils/SnapshotUtilsTest.java          | 260 +++++++++++++++++----
 8 files changed, 466 insertions(+), 96 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index e53052d8..21a03bcf 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -124,15 +124,15 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.checkpoint.interval</h5></td>
-            <td style="word-wrap: break-word;">0 ms</td>
-            <td>Duration</td>
-            <td>Interval at which periodic checkpoints will be triggered. The 
triggering schedule is not guaranteed, checkpoints will be triggered as part of 
the regular reconcile loop. NOTE: checkpoints are generally managed by Flink. 
This setting isn't meant to replace Flink's checkpoint settings, but to 
complement them in special cases. For instance, a full checkpoint might need to 
be occasionally triggered to break the chain of incremental checkpoints and 
consolidate the partial incr [...]
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Option to enable automatic checkpoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, checkpoints will be triggered as part 
of the regular reconcile loop. NOTE: checkpoints are generally managed by 
Flink. This setting isn't meant to replace Flink's checkpoint se [...]
         </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
-            <td style="word-wrap: break-word;">0 ms</td>
-            <td>Duration</td>
-            <td>Interval at which periodic savepoints will be triggered. The 
triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop.</td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Option to enable automatic savepoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop. WARNING: not intended to be used together with the 
cron-based periodic savepoint triggering</td>
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.pod-template.merge-arrays-by-name</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 45ebb09e..2f3341e5 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -256,15 +256,15 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.checkpoint.interval</h5></td>
-            <td style="word-wrap: break-word;">0 ms</td>
-            <td>Duration</td>
-            <td>Interval at which periodic checkpoints will be triggered. The 
triggering schedule is not guaranteed, checkpoints will be triggered as part of 
the regular reconcile loop. NOTE: checkpoints are generally managed by Flink. 
This setting isn't meant to replace Flink's checkpoint settings, but to 
complement them in special cases. For instance, a full checkpoint might need to 
be occasionally triggered to break the chain of incremental checkpoints and 
consolidate the partial incr [...]
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Option to enable automatic checkpoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, checkpoints will be triggered as part 
of the regular reconcile loop. NOTE: checkpoints are generally managed by 
Flink. This setting isn't meant to replace Flink's checkpoint se [...]
         </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
-            <td style="word-wrap: break-word;">0 ms</td>
-            <td>Duration</td>
-            <td>Interval at which periodic savepoints will be triggered. The 
triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop.</td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Option to enable automatic savepoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop. WARNING: not intended to be used together with the 
cron-based periodic savepoint triggering</td>
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.pod-template.merge-arrays-by-name</h5></td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 8226ce0d..e7c9112a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -298,22 +298,30 @@ public class KubernetesOperatorConfigOptions {
                                     + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
     @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+    public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
             operatorConfig("periodic.savepoint.interval")
-                    .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .stringType()
+                    .defaultValue("")
                     .withDescription(
-                            "Interval at which periodic savepoints will be 
triggered. "
+                            "Option to enable automatic savepoint triggering. 
Can be specified "
+                                    + "either as a Duration type (i.e. '10m') 
or as a cron expression "
+                                    + "in Quartz format (6 or 7 positions, see 
"
+                                    + 
"http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html)."
                                     + "The triggering schedule is not 
guaranteed, savepoints will be "
-                                    + "triggered as part of the regular 
reconcile loop.");
+                                    + "triggered as part of the regular 
reconcile loop. "
+                                    + "WARNING: not intended to be used 
together with the cron-based "
+                                    + "periodic savepoint triggering");
 
     @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<Duration> PERIODIC_CHECKPOINT_INTERVAL =
+    public static final ConfigOption<String> PERIODIC_CHECKPOINT_INTERVAL =
             operatorConfig("periodic.checkpoint.interval")
-                    .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .stringType()
+                    .defaultValue("")
                     .withDescription(
-                            "Interval at which periodic checkpoints will be 
triggered. "
+                            "Option to enable automatic checkpoint triggering. 
Can be specified "
+                                    + "either as a Duration type (i.e. '10m') 
or as a cron expression "
+                                    + "in Quartz format (6 or 7 positions, see 
"
+                                    + 
"http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html)."
                                     + "The triggering schedule is not 
guaranteed, checkpoints will "
                                     + "be triggered as part of the regular 
reconcile loop. "
                                     + "NOTE: checkpoints are generally managed 
by Flink. This "
@@ -321,7 +329,9 @@ public class KubernetesOperatorConfigOptions {
                                     + "but to complement them in special 
cases. For instance, a "
                                     + "full checkpoint might need to be 
occasionally triggered to "
                                     + "break the chain of incremental 
checkpoints and consolidate "
-                                    + "the partial incremental files.");
+                                    + "the partial incremental files. "
+                                    + "WARNING: not intended to be used 
together with the cron-based "
+                                    + "periodic checkpoint triggering");
 
     @Documentation.Section(SECTION_SYSTEM)
     public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 54bcdfb6..3d3a68a2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -45,7 +45,7 @@ import java.util.List;
 
 import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
 import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
-import static 
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.isCheckpointsTriggeringSupported;
+import static 
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.isSnapshotTriggeringSupported;
 
 /** An observer of savepoint progress. */
 public class SnapshotObserver<
@@ -81,7 +81,7 @@ public class SnapshotObserver<
     }
 
     public void observeCheckpointStatus(FlinkResourceContext<CR> ctx) {
-        if (!isCheckpointsTriggeringSupported(ctx.getObserveConfig())) {
+        if (!isSnapshotTriggeringSupported(ctx.getObserveConfig())) {
             return;
         }
         var resource = ctx.getResource();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index 8b381476..4cc5ee2f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
@@ -31,11 +32,14 @@ import 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.core.util.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Date;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -194,7 +198,7 @@ public class SnapshotUtils {
         Long reconciledTriggerNonce;
         boolean inProgress;
         SnapshotInfo snapshotInfo;
-        Duration interval;
+        String automaticTriggerExpression;
 
         switch (snapshotType) {
             case SAVEPOINT:
@@ -202,21 +206,16 @@ public class SnapshotUtils {
                 reconciledTriggerNonce = 
reconciledJobSpec.getSavepointTriggerNonce();
                 inProgress = savepointInProgress(jobStatus);
                 snapshotInfo = jobStatus.getSavepointInfo();
-                interval = 
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
+                automaticTriggerExpression =
+                        
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
                 break;
             case CHECKPOINT:
                 triggerNonce = jobSpec.getCheckpointTriggerNonce();
                 reconciledTriggerNonce = 
reconciledJobSpec.getCheckpointTriggerNonce();
                 inProgress = checkpointInProgress(jobStatus);
                 snapshotInfo = jobStatus.getCheckpointInfo();
-                interval = 
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
-                if (!isCheckpointsTriggeringSupported(conf)) {
-                    LOG.warn(
-                            "Periodic checkpoints triggering every {} is 
configured, "
-                                    + "but not supported (requires Flink 
1.17+)",
-                            interval);
-                    return Optional.empty();
-                }
+                automaticTriggerExpression =
+                        
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported snapshot type: 
" + snapshotType);
@@ -229,33 +228,129 @@ public class SnapshotUtils {
         var triggerNonceChanged =
                 triggerNonce != null && 
!triggerNonce.equals(reconciledTriggerNonce);
         if (triggerNonceChanged) {
-            return Optional.of(SnapshotTriggerType.MANUAL);
-        }
-
-        if (interval.isZero()) {
-            return Optional.empty();
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Manual checkpoint triggering is attempted, but is not 
supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.MANUAL);
+            }
         }
 
         var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
-
         // When the resource is first created/periodic snapshotting enabled we 
have to compare
         // against the creation timestamp for triggering the first periodic 
savepoint
         var lastTrigger =
                 lastTriggerTs == 0
                         ? 
Instant.parse(resource.getMetadata().getCreationTimestamp())
                         : Instant.ofEpochMilli(lastTriggerTs);
+
+        if (shouldTriggerAutomaticSnapshot(snapshotType, 
automaticTriggerExpression, lastTrigger)) {
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Automatic checkpoints triggering is configured but is 
not supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.PERIODIC);
+            }
+        }
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    static boolean shouldTriggerAutomaticSnapshot(
+            SnapshotType snapshotType, String automaticTriggerExpression, 
Instant lastTrigger) {
+        if (StringUtils.isBlank(automaticTriggerExpression)) {
+            return false;
+        } // automaticTriggerExpression was configured by the user
+
+        Optional<Duration> interval = 
interpretAsInterval(automaticTriggerExpression);
+        Optional<CronExpression> cron = 
interpretAsCron(automaticTriggerExpression);
+
+        // This should never happen. The string cannot be both a valid 
Duration and a cron
+        // expression at the same time.
+        if (interval.isPresent() && cron.isPresent()) {
+            LOG.error(
+                    "Something went wrong with the automatic {} trigger 
expression {}. This setting cannot be simultaneously a valid Duration and a 
cron expression.",
+                    snapshotType,
+                    automaticTriggerExpression);
+            return false;
+        }
+
+        if (interval.isPresent()) {
+            return shouldTriggerIntervalBasedSnapshot(snapshotType, 
interval.get(), lastTrigger);
+        } else if (cron.isPresent()) {
+            return shouldTriggerCronBasedSnapshot(
+                    snapshotType, cron.get(), lastTrigger, Instant.now());
+        } else {
+            LOG.warn(
+                    "Automatic {} triggering is configured, but the trigger 
expression '{}' is neither a valid Duration, nor a cron expression.",
+                    snapshotType,
+                    automaticTriggerExpression);
+            return false;
+        }
+    }
+
+    @VisibleForTesting
+    static boolean shouldTriggerCronBasedSnapshot(
+            SnapshotType snapshotType,
+            CronExpression cronExpression,
+            Instant lastTriggerDateInstant,
+            Instant nowInstant) {
+        Date now = Date.from(nowInstant);
+        Date lastTrigger = Date.from(lastTriggerDateInstant);
+
+        Date nextValidTimeAfterLastTrigger = 
cronExpression.getNextValidTimeAfter(lastTrigger);
+
+        if (nextValidTimeAfterLastTrigger != null && 
nextValidTimeAfterLastTrigger.before(now)) {
+            LOG.info(
+                    "Triggering new automatic {} based on cron schedule '{}' 
due at {}",
+                    snapshotType.toString().toLowerCase(),
+                    cronExpression.toString(),
+                    nextValidTimeAfterLastTrigger);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @VisibleForTesting
+    static boolean shouldTriggerIntervalBasedSnapshot(
+            SnapshotType snapshotType, Duration interval, Instant lastTrigger) 
{
+        if (interval.isZero()) {
+            return false;
+        }
         var now = Instant.now();
         if (lastTrigger.plus(interval).isBefore(Instant.now())) {
             LOG.info(
-                    "Triggering new periodic {} after {}",
+                    "Triggering new automatic {} after {}",
                     snapshotType.toString().toLowerCase(),
                     Duration.between(lastTrigger, now));
-            return Optional.of(SnapshotTriggerType.PERIODIC);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @VisibleForTesting
+    static Optional<Duration> interpretAsInterval(String triggerExpression) {
+        try {
+            return 
Optional.of(ConfigurationUtils.convertValue(triggerExpression, Duration.class));
+        } catch (Exception exception) {
+            return Optional.empty();
+        }
+    }
+
+    @VisibleForTesting
+    static Optional<CronExpression> interpretAsCron(String triggerExpression) {
+        try {
+            return Optional.of(new CronExpression(triggerExpression));
+        } catch (ParseException e) {
+            return Optional.empty();
         }
-        return Optional.empty();
     }
 
-    public static boolean isCheckpointsTriggeringSupported(Configuration conf) 
{
+    public static boolean isSnapshotTriggeringSupported(Configuration conf) {
         // Flink REST API supports triggering checkpoints externally starting 
with 1.17
         return conf.get(FLINK_VERSION) != null
                 && 
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index b29fa980..e228da2f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -23,10 +23,15 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
@@ -66,6 +71,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -323,6 +329,54 @@ public class TestUtils extends BaseTestUtils {
         return cr;
     }
 
+    public static void reconcileSpec(FlinkDeployment deployment) {
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
+    }
+
+    /**
+     * Sets up an active cron trigger by ensuring that the latest successful 
snapshot happened
+     * earlier than the scheduled trigger.
+     */
+    public static void setupCronTrigger(SnapshotType snapshotType, 
FlinkDeployment deployment) {
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2022, Calendar.JUNE, 5, 11, 0);
+        long lastCheckpointTimestamp = calendar.getTimeInMillis();
+
+        String cronOptionKey;
+
+        switch (snapshotType) {
+            case SAVEPOINT:
+                Savepoint lastSavepoint =
+                        Savepoint.of("", lastCheckpointTimestamp, 
SnapshotTriggerType.PERIODIC);
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .updateLastSavepoint(lastSavepoint);
+                cronOptionKey = 
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key();
+                break;
+            case CHECKPOINT:
+                Checkpoint lastCheckpoint =
+                        Checkpoint.of(lastCheckpointTimestamp, 
SnapshotTriggerType.PERIODIC);
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getCheckpointInfo()
+                        .updateLastCheckpoint(lastCheckpoint);
+                cronOptionKey = 
KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported snapshot type: 
" + snapshotType);
+        }
+
+        deployment.getSpec().getFlinkConfiguration().put(cronOptionKey, "0 0 
12 5 6 ? 2022");
+        reconcileSpec(deployment);
+    }
+
     /** Testing ResponseProvider. */
     public static class ValidatingResponseProvider<T> implements 
ResponseProvider<Object> {
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 29ac7153..e5a1ab35 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -85,6 +85,7 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -360,7 +361,8 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         final BiConsumer<JobSpec, Long> setTriggerNonce;
         final Function<JobSpec, Long> getTriggerNonce;
         final Consumer<FlinkDeployment> updateLastSnapshot;
-        final ConfigOption<Duration> periodicSnapshotInterval;
+        final BiConsumer<FlinkDeployment, Long> setLastSnapshotTime;
+        final ConfigOption<String> triggerSnapshotExpression;
         final String triggerPrefix;
 
         switch (snapshotType) {
@@ -378,7 +380,17 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                                             
getJobSpec(flinkDeployment).getSavepointTriggerNonce());
                             savepointInfo.updateLastSavepoint(savepoint);
                         };
-                periodicSnapshotInterval =
+                setLastSnapshotTime =
+                        (flinkDeployment, timestamp) -> {
+                            Savepoint lastSavepoint =
+                                    Savepoint.of("", timestamp, 
SnapshotTriggerType.PERIODIC);
+                            flinkDeployment
+                                    .getStatus()
+                                    .getJobStatus()
+                                    .getSavepointInfo()
+                                    .updateLastSavepoint(lastSavepoint);
+                        };
+                triggerSnapshotExpression =
                         
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
                 triggerPrefix = "savepoint_";
                 break;
@@ -397,7 +409,17 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                                                     
.getCheckpointTriggerNonce());
                             checkpointInfo.updateLastCheckpoint(checkpoint);
                         };
-                periodicSnapshotInterval =
+                setLastSnapshotTime =
+                        (flinkDeployment, timestamp) -> {
+                            Checkpoint lastCheckpoint =
+                                    Checkpoint.of(timestamp, 
SnapshotTriggerType.PERIODIC);
+                            flinkDeployment
+                                    .getStatus()
+                                    .getJobStatus()
+                                    .getCheckpointInfo()
+                                    .updateLastCheckpoint(lastCheckpoint);
+                        };
+                triggerSnapshotExpression =
                         
KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
                 triggerPrefix = "checkpoint_";
                 break;
@@ -480,11 +502,34 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(snDeployment, context);
         assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
 
-        // trigger by periodic settings
-        
snDeployment.getSpec().getFlinkConfiguration().put(periodicSnapshotInterval.key(),
 "1");
+        // trigger by periodic interval settings
+        
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(),
 "1");
         reconciler.reconcile(snDeployment, context);
         assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
         assertEquals(SnapshotStatus.PENDING, 
getLastSnapshotStatus(snDeployment, snapshotType));
+        
snDeployment.getSpec().getFlinkConfiguration().put(triggerSnapshotExpression.key(),
 "0");
+
+        // trigger by cron expression
+        updateLastSnapshot.accept(snDeployment); // Ensures no snapshot is 
considered to be running
+        assertFalse(isSnapshotInProgress.test(getJobStatus(snDeployment)));
+        assertNotEquals(SnapshotStatus.PENDING, 
getLastSnapshotStatus(snDeployment, snapshotType));
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2022, Calendar.JUNE, 5, 11, 0);
+        setLastSnapshotTime.accept(
+                snDeployment, calendar.getTimeInMillis()); // Required for the 
cron to trigger
+
+        snDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(triggerSnapshotExpression.key(), "0 0 12 5 6 ? 2022");
+        reconciler.reconcile(snDeployment, context);
+        assertTrue(isSnapshotInProgress.test(getJobStatus(snDeployment)));
+        assertEquals(SnapshotStatus.PENDING, 
getLastSnapshotStatus(snDeployment, snapshotType));
+        snDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(triggerSnapshotExpression.key(), 
triggerSnapshotExpression.defaultValue());
     }
 
     @NotNull
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index d704c7c3..1c7e6b13 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -25,16 +26,26 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
 
+import org.apache.logging.log4j.core.util.CronExpression;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Calendar;
 import java.util.Optional;
 
+import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
+import static org.apache.flink.kubernetes.operator.TestUtils.setupCronTrigger;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+import static 
org.apache.flink.kubernetes.operator.utils.SnapshotUtils.shouldTriggerAutomaticSnapshot;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link SnapshotUtils}. */
 public class SnapshotUtilsTest {
@@ -43,37 +54,19 @@ public class SnapshotUtilsTest {
 
     @Test
     public void testSavepointTriggering() {
-        SnapshotType snapshotType = SnapshotType.SAVEPOINT;
         FlinkDeployment deployment = initDeployment(FlinkVersion.v1_15);
-        reconcileSpec(deployment);
-
-        assertEquals(
-                Optional.empty(),
-                SnapshotUtils.shouldTriggerSnapshot(
-                        deployment, 
configManager.getObserveConfig(deployment), snapshotType));
-
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                
.put(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "10m");
-        reconcileSpec(deployment);
-
-        assertEquals(
-                Optional.of(SnapshotTriggerType.PERIODIC),
-                SnapshotUtils.shouldTriggerSnapshot(
-                        deployment, 
configManager.getObserveConfig(deployment), snapshotType));
-        
deployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        testSnapshotTriggering(deployment, SAVEPOINT, 
PERIODIC_SAVEPOINT_INTERVAL);
+    }
 
-        deployment.getSpec().getJob().setSavepointTriggerNonce(123L);
-        assertEquals(
-                Optional.of(SnapshotTriggerType.MANUAL),
-                SnapshotUtils.shouldTriggerSnapshot(
-                        deployment, 
configManager.getObserveConfig(deployment), snapshotType));
+    @Test
+    public void testCheckpointTriggeringPost1_17() {
+        FlinkDeployment deployment = initDeployment(FlinkVersion.v1_17);
+        testSnapshotTriggering(deployment, CHECKPOINT, 
PERIODIC_CHECKPOINT_INTERVAL);
     }
 
     @Test
     public void testCheckpointTriggeringPre1_17() {
-        SnapshotType snapshotType = SnapshotType.CHECKPOINT;
+        SnapshotType snapshotType = CHECKPOINT;
         FlinkDeployment deployment = initDeployment(FlinkVersion.v1_16);
         reconcileSpec(deployment);
 
@@ -83,31 +76,35 @@ public class SnapshotUtilsTest {
                 SnapshotUtils.shouldTriggerSnapshot(
                         deployment, 
configManager.getObserveConfig(deployment), snapshotType));
 
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                
.put(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key(), "10m");
+        
deployment.getSpec().getFlinkConfiguration().put(PERIODIC_CHECKPOINT_INTERVAL.key(),
 "10m");
         reconcileSpec(deployment);
 
         assertEquals(
                 Optional.empty(),
                 SnapshotUtils.shouldTriggerSnapshot(
                         deployment, 
configManager.getObserveConfig(deployment), snapshotType));
-        
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+        resetTrigger(deployment, snapshotType);
 
-        deployment.getSpec().getJob().setCheckpointTriggerNonce(123L);
+        setTriggerNonce(deployment, snapshotType, 123L);
         assertEquals(
                 Optional.empty(),
                 SnapshotUtils.shouldTriggerSnapshot(
                         deployment, 
configManager.getObserveConfig(deployment), snapshotType));
+        resetTrigger(deployment, snapshotType);
+
+        setupCronTrigger(snapshotType, deployment);
+        assertEquals(
+                Optional.empty(),
+                SnapshotUtils.shouldTriggerSnapshot(
+                        deployment, 
configManager.getObserveConfig(deployment), snapshotType));
+        resetTrigger(deployment, snapshotType);
     }
 
-    @Test
-    public void testCheckpointTriggeringPost1_17() {
-        SnapshotType snapshotType = SnapshotType.CHECKPOINT;
-        FlinkDeployment deployment = initDeployment(FlinkVersion.v1_17);
+    private void testSnapshotTriggering(
+            FlinkDeployment deployment,
+            SnapshotType snapshotType,
+            ConfigOption<String> periodicSnapshotIntervalOption) {
         reconcileSpec(deployment);
-
         assertEquals(
                 Optional.empty(),
                 SnapshotUtils.shouldTriggerSnapshot(
@@ -116,20 +113,195 @@ public class SnapshotUtilsTest {
         deployment
                 .getSpec()
                 .getFlinkConfiguration()
-                
.put(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key(), "10m");
+                .put(periodicSnapshotIntervalOption.key(), "10m");
         reconcileSpec(deployment);
 
         assertEquals(
                 Optional.of(SnapshotTriggerType.PERIODIC),
                 SnapshotUtils.shouldTriggerSnapshot(
                         deployment, 
configManager.getObserveConfig(deployment), snapshotType));
-        
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+        resetTrigger(deployment, snapshotType);
+        
deployment.getSpec().getFlinkConfiguration().put(periodicSnapshotIntervalOption.key(),
 "0");
+        reconcileSpec(deployment);
 
-        deployment.getSpec().getJob().setCheckpointTriggerNonce(123L);
+        setTriggerNonce(deployment, snapshotType, 123L);
         assertEquals(
                 Optional.of(SnapshotTriggerType.MANUAL),
                 SnapshotUtils.shouldTriggerSnapshot(
                         deployment, 
configManager.getObserveConfig(deployment), snapshotType));
+        resetTrigger(deployment, snapshotType);
+        reconcileSpec(deployment);
+
+        setupCronTrigger(snapshotType, deployment);
+        assertEquals(
+                Optional.of(SnapshotTriggerType.PERIODIC),
+                SnapshotUtils.shouldTriggerSnapshot(
+                        deployment, 
configManager.getObserveConfig(deployment), snapshotType));
+    }
+
+    @Test
+    public void testInterpretAsInterval_InvalidExpression() {
+        Optional<Duration> interval = 
SnapshotUtils.interpretAsInterval("INVALID_DURATION");
+        assertTrue(interval.isEmpty());
+    }
+
+    @Test
+    public void testInterpretAsInterval_EmptyExpression() {
+        Optional<Duration> interval = SnapshotUtils.interpretAsInterval("");
+        assertTrue(interval.isEmpty());
+    }
+
+    @Test
+    public void 
testShouldTriggerIntervalBasedSnapshot_ZeroDurationReturnsFalse() {
+        Duration interval = SnapshotUtils.interpretAsInterval("0").get();
+
+        assertFalse(
+                SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+                        SnapshotType.CHECKPOINT, interval, Instant.now()));
+    }
+
+    @Test
+    public void 
testShouldTriggerIntervalBasedSnapshot_NextValidTimeBeforeCurrent() {
+        Duration interval = SnapshotUtils.interpretAsInterval("10M").get();
+        Instant lastTrigger = Instant.now().minus(Duration.ofMinutes(5));
+        assertFalse(
+                SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+                        SnapshotType.CHECKPOINT, interval, lastTrigger));
+    }
+
+    @Test
+    public void 
testShouldTriggerIntervalBasedSnapshot_NextValidTimeAfterCurrent() {
+        Duration interval = SnapshotUtils.interpretAsInterval("10M").get();
+        Instant lastTrigger = Instant.now().minus(Duration.ofMinutes(11));
+        assertTrue(
+                SnapshotUtils.shouldTriggerIntervalBasedSnapshot(
+                        SnapshotType.CHECKPOINT, interval, lastTrigger));
+    }
+
+    @Test
+    public void 
testShouldTriggerCronBasedSnapshot_NextValidTimeBeforeCurrent() {
+        String cronExpressionString = "0 */10 * * * ?"; // Every 10th minute
+        CronExpression cronExpression = 
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2022, Calendar.JUNE, 5, 11, 5); // 11:05
+
+        Instant now = calendar.getTime().toInstant();
+        Instant lastTrigger =
+                now.minus(Duration.ofMinutes(10)); // 10:05, should have fired 
at 11:00
+
+        boolean result =
+                SnapshotUtils.shouldTriggerCronBasedSnapshot(
+                        CHECKPOINT, cronExpression, lastTrigger, now);
+
+        assertTrue(result);
+    }
+
+    @Test
+    public void testShouldTriggerCronBasedSnapshot_NextValidTimeAfterCurrent() 
{
+        String cronExpressionString = "0 */10 * * * ?"; // Every 10th minute
+        CronExpression cronExpression = 
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2022, Calendar.JUNE, 5, 11, 5);
+
+        Instant now = calendar.getTime().toInstant(); // 11:05
+        Instant lastTrigger = now.minus(Duration.ofMinutes(4)); // 11:01, next 
trigger at 11:10
+
+        boolean result =
+                SnapshotUtils.shouldTriggerCronBasedSnapshot(
+                        CHECKPOINT, cronExpression, lastTrigger, now);
+
+        assertFalse(result);
+    }
+
+    @Test
+    public void testShouldTriggerCronBasedSnapshot_NoNextValidTime() {
+        String cronExpressionString =
+                "0 0 0 29 2 ? 1999"; // An impossible time (Feb 29, 1999 was 
not a leap year)
+        CronExpression cronExpression = 
SnapshotUtils.interpretAsCron(cronExpressionString).get();
+
+        Instant now = Instant.now();
+        Instant lastTrigger = now.minus(Duration.ofDays(365));
+
+        boolean result =
+                SnapshotUtils.shouldTriggerCronBasedSnapshot(
+                        CHECKPOINT, cronExpression, lastTrigger, now);
+
+        assertFalse(result);
+    }
+
+    @Test
+    public void testInterpretAsCron_InvalidCron() {
+        Optional<CronExpression> cronExpression = 
SnapshotUtils.interpretAsCron("INVALID_CRON");
+
+        assertTrue(cronExpression.isEmpty());
+    }
+
+    @Test
+    public void testInterpretAsCron_EmptyCron() {
+        Optional<CronExpression> cronExpression = 
SnapshotUtils.interpretAsCron("");
+
+        assertTrue(cronExpression.isEmpty());
+    }
+
+    @Test
+    public void shouldTriggerAutomaticSnapshot_EmptyExpression() {
+        boolean shouldTrigger =
+                shouldTriggerAutomaticSnapshot(
+                        CHECKPOINT, "", 
Instant.now().minus(Duration.ofDays(365)));
+        assertFalse(shouldTrigger);
+    }
+
+    @Test
+    public void shouldTriggerAutomaticSnapshot_InvalidExpression() {
+        boolean shouldTrigger =
+                shouldTriggerAutomaticSnapshot(
+                        CHECKPOINT, "-1", 
Instant.now().minus(Duration.ofDays(365)));
+        assertFalse(shouldTrigger);
+    }
+
+    @Test
+    public void shouldTriggerAutomaticSnapshot_ValidIntervalExpression() {
+        boolean shouldTrigger =
+                shouldTriggerAutomaticSnapshot(
+                        CHECKPOINT, "10m", 
Instant.now().minus(Duration.ofDays(365)));
+        assertTrue(shouldTrigger);
+    }
+
+    @Test
+    public void shouldTriggerAutomaticSnapshot_ValidCronExpression() {
+        boolean shouldTrigger =
+                shouldTriggerAutomaticSnapshot(
+                        CHECKPOINT, "0 */10 * * * ?", 
Instant.now().minus(Duration.ofDays(365)));
+        assertTrue(shouldTrigger);
+    }
+
+    private static void resetTrigger(FlinkDeployment deployment, SnapshotType 
snapshotType) {
+        switch (snapshotType) {
+            case SAVEPOINT:
+                
deployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+                break;
+            case CHECKPOINT:
+                
deployment.getStatus().getJobStatus().getCheckpointInfo().resetTrigger();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported snapshot type: 
" + snapshotType);
+        }
+    }
+
+    private static void setTriggerNonce(
+            FlinkDeployment deployment, SnapshotType snapshotType, long nonce) 
{
+        switch (snapshotType) {
+            case SAVEPOINT:
+                deployment.getSpec().getJob().setSavepointTriggerNonce(nonce);
+                break;
+            case CHECKPOINT:
+                deployment.getSpec().getJob().setCheckpointTriggerNonce(nonce);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported snapshot type: 
" + snapshotType);
+        }
     }
 
     private static FlinkDeployment initDeployment(FlinkVersion flinkVersion) {
@@ -140,13 +312,7 @@ public class SnapshotUtilsTest {
 
         
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        reconcileSpec(deployment);
         return deployment;
     }
-
-    private static void reconcileSpec(FlinkDeployment deployment) {
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
-    }
 }


Reply via email to