afedulov commented on code in PR #656:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/656#discussion_r1303476310


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -229,33 +228,85 @@ protected static Optional<SnapshotTriggerType> 
shouldTriggerSnapshot(
         var triggerNonceChanged =
                 triggerNonce != null && 
!triggerNonce.equals(reconciledTriggerNonce);
         if (triggerNonceChanged) {
-            return Optional.of(SnapshotTriggerType.MANUAL);
-        }
-
-        if (interval.isZero()) {
-            return Optional.empty();
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Manual checkpoint triggering is attempted, but is not 
supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.MANUAL);
+            }
         }
 
         var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
-
         // When the resource is first created/periodic snapshotting enabled we 
have to compare
         // against the creation timestamp for triggering the first periodic 
savepoint
         var lastTrigger =
                 lastTriggerTs == 0
                         ? 
Instant.parse(resource.getMetadata().getCreationTimestamp())
                         : Instant.ofEpochMilli(lastTriggerTs);
+
+        if (shouldTriggerIntervalBasedSnapshot(snapshotType, interval, 
lastTrigger)
+                || shouldTriggerCronBasedSnapshot(
+                        snapshotType, cronExpression, lastTrigger, 
Instant.now())) {
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Periodic checkpoints triggering is configured but is 
not supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.PERIODIC);
+            }
+        }
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    static boolean shouldTriggerCronBasedSnapshot(
+            SnapshotType snapshotType,
+            String cronExpressionString,
+            Instant lastTriggerDateInstant,
+            Instant nowInstant) {
+        try {
+            CronExpression cronExpression = new 
CronExpression(cronExpressionString);
+            Date now = Date.from(nowInstant);
+            Date lastTrigger = Date.from(lastTriggerDateInstant);
+
+            Date nextValidTimeAfterLastTrigger = 
cronExpression.getNextValidTimeAfter(lastTrigger);
+
+            if (nextValidTimeAfterLastTrigger != null
+                    && nextValidTimeAfterLastTrigger.before(now)) {
+                LOG.info(
+                        "Triggering new periodic {} based on cron schedule 
'{}' due at {}",
+                        snapshotType.toString().toLowerCase(),
+                        cronExpressionString,
+                        nextValidTimeAfterLastTrigger);
+                return true;
+            } else {
+                return false;
+            }
+        } catch (ParseException e) {
+            LOG.warn("Invalid cron expression: " + cronExpressionString);

Review Comment:
   I guess we could. Would require quite some passing around of the 
`EventRecorder`, plus mocking it for unit tests. Do we do that for other 
misconfigured settings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to