mxm commented on code in PR #656:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/656#discussion_r1305509932


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -229,33 +228,140 @@ protected static Optional<SnapshotTriggerType> 
shouldTriggerSnapshot(
         var triggerNonceChanged =
                 triggerNonce != null && 
!triggerNonce.equals(reconciledTriggerNonce);
         if (triggerNonceChanged) {
-            return Optional.of(SnapshotTriggerType.MANUAL);
-        }
-
-        if (interval.isZero()) {
-            return Optional.empty();
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Manual checkpoint triggering is attempted, but is not 
supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.MANUAL);
+            }
         }
 
         var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
-
         // When the resource is first created/periodic snapshotting enabled we 
have to compare
         // against the creation timestamp for triggering the first periodic 
savepoint
         var lastTrigger =
                 lastTriggerTs == 0
                         ? 
Instant.parse(resource.getMetadata().getCreationTimestamp())
                         : Instant.ofEpochMilli(lastTriggerTs);
+
+        if (shouldTriggerAutomaticSnapshot(snapshotType, 
automaticTriggerExpression, lastTrigger)) {
+            if (snapshotType == CHECKPOINT && 
!isSnapshotTriggeringSupported(conf)) {
+                LOG.warn(
+                        "Automatic checkpoints triggering is configured but is 
not supported (requires Flink 1.17+)");
+                return Optional.empty();
+            } else {
+                return Optional.of(SnapshotTriggerType.PERIODIC);
+            }
+        }
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    static boolean shouldTriggerAutomaticSnapshot(
+            SnapshotType snapshotType, String automaticTriggerExpression, 
Instant lastTrigger) {
+        if (StringUtils.isBlank(automaticTriggerExpression)) {
+            return false;
+        } // automaticTriggerExpression was configured by the user
+
+        // Try to interpret as an interval (Duration).
+        boolean expressionIsAnInterval = true;
+        boolean shouldTriggerInterval = false;
+        try {
+            shouldTriggerInterval =
+                    shouldTriggerIntervalBasedSnapshot(
+                            snapshotType, automaticTriggerExpression, 
lastTrigger);
+        } catch (ParseException e) {
+            expressionIsAnInterval = false;
+        }
+
+        // Try to interpret as a cron expression.
+        boolean expressionIsACron = true;
+        boolean shouldTriggerCron = false;
+        try {
+            shouldTriggerCron =
+                    shouldTriggerCronBasedSnapshot(
+                            snapshotType, automaticTriggerExpression, 
lastTrigger, Instant.now());
+        } catch (ParseException e) {
+            expressionIsACron = false;
+        }
+
+        if (!expressionIsAnInterval && !expressionIsACron) {
+            LOG.warn(
+                    "Automatic {} triggering is configured, but the trigger 
expression '{}' is neither a valid Duration, nor a cron expression.",
+                    snapshotType,
+                    automaticTriggerExpression);
+            return false;
+        }
+
+        // This should never happen. The string cannot be both a valid 
Duration and a cron
+        // expression at the same time.
+        if (expressionIsAnInterval && expressionIsACron) {
+            LOG.error(
+                    "Something went wrong with the automatic {} trigger 
expression {}. This setting cannot be simultaneously a valid Duration and a 
cron expression.",
+                    snapshotType,
+                    automaticTriggerExpression);
+            return false;
+        }
+
+        return shouldTriggerInterval || shouldTriggerCron;

Review Comment:
   I find this method a bit too complex. How about changing the structure here 
a bit:
   
   ```java
   if (isInterval()) {
     // check Interval triggering
   } else {
     // check cron triggering
   }
   ```
   
   Can we remove the check for both an interval and a cron? We can simply 
assume the interval always has precedence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to