mxm commented on code in PR #656:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/656#discussion_r1304303877


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -259,6 +257,31 @@ protected static Optional<SnapshotTriggerType> 
shouldTriggerSnapshot(
         return Optional.empty();
     }
 
+    private static boolean shouldTriggerAutomaticSnapshot(
+            SnapshotType snapshotType, String automaticTriggerExpression, 
Instant lastTrigger) {
+        if (StringUtils.isBlank(automaticTriggerExpression)) {
+            return false;
+        }
+
+        boolean shouldTrigger =
+                shouldTriggerIntervalBasedSnapshot(
+                                snapshotType, automaticTriggerExpression, 
lastTrigger)
+                        || shouldTriggerCronBasedSnapshot(

Review Comment:
   This condition means that theoretically still both can trigger. We should 
make sure that the condition guarantees mutual exclusion. So first check 
interval, then check cron based execution.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -284,20 +307,27 @@ static boolean shouldTriggerCronBasedSnapshot(
                 return false;
             }
         } catch (ParseException e) {
-            LOG.warn("Invalid cron expression: " + cronExpressionString);
             return false;
         }
     }
 
     private static boolean shouldTriggerIntervalBasedSnapshot(
-            SnapshotType snapshotType, Duration interval, Instant lastTrigger) 
{
+            SnapshotType snapshotType, String triggerExpression, Instant 
lastTrigger) {
+
+        Duration interval;
+        try {
+            interval = ConfigurationUtils.convertValue(triggerExpression, 
Duration.class);
+        } catch (IllegalArgumentException exception) {
+            return false;

Review Comment:
   I'm not sure, this will actually catch all exceptions. There are different 
ones thrown. I think we might have to catch `Exception` here. We should 
probably add a test to verify the fallback behavior.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -298,37 +298,26 @@ public static String operatorConfigKey(String key) {
                                     + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
     @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+    public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
             operatorConfig("periodic.savepoint.interval")
-                    .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .stringType()
+                    .defaultValue("")
                     .withDescription(
-                            "Interval at which periodic savepoints will be 
triggered. "
+                            "Option to enable automatic savepoint triggering. 
Can be specified "
+                                    + "either as a Duration type (i.e. '10m') 
or as a cron expression."

Review Comment:
   ```suggestion
                                       + "either as a Duration type (i.e. '10 
minutes') or as a cron expression (see https://en.wikipedia.org/wiki/Cron)."
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -298,37 +298,26 @@ public static String operatorConfigKey(String key) {
                                     + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
     @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+    public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
             operatorConfig("periodic.savepoint.interval")
-                    .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .stringType()
+                    .defaultValue("")
                     .withDescription(
-                            "Interval at which periodic savepoints will be 
triggered. "
+                            "Option to enable automatic savepoint triggering. 
Can be specified "
+                                    + "either as a Duration type (i.e. '10m') 
or as a cron expression."
                                     + "The triggering schedule is not 
guaranteed, savepoints will be "
                                     + "triggered as part of the regular 
reconcile loop. "
                                     + "WARNING: not intended to be used 
together with the cron-based "
                                     + "periodic savepoint triggering");
 
     @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<String> PERIODIC_SAVEPOINT_CRON =
-            operatorConfig("periodic.savepoint.cron")
-                    .stringType()
-                    .defaultValue(
-                            "0 0 0 29 2 ? 1999") // Default value that is 
guaranteed not to trigger
-                    .withDescription(
-                            "A cron expression that defines when to 
automatically trigger savepoints."
-                                    + "The precise triggering schedule is not 
guaranteed, savepoints will be "
-                                    + "triggered as part of the regular 
reconcile loop. "
-                                    + "WARNING: not intended to be used 
together with the interval-based "
-                                    + "periodic savepoint triggering");
-
-    @Documentation.Section(SECTION_DYNAMIC)
-    public static final ConfigOption<Duration> PERIODIC_CHECKPOINT_INTERVAL =
+    public static final ConfigOption<String> PERIODIC_CHECKPOINT_INTERVAL =
             operatorConfig("periodic.checkpoint.interval")
-                    .durationType()
-                    .defaultValue(Duration.ZERO)
+                    .stringType()
+                    .defaultValue("")
                     .withDescription(
-                            "Interval at which periodic checkpoints will be 
triggered. "
+                            "Option to enable automatic checkpoint triggering. 
Can be specified "
+                                    + "either as a Duration type (i.e. '10m') 
or as a cron expression."

Review Comment:
   Same comment as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to