mxm commented on code in PR #656:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/656#discussion_r1304303877
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -259,6 +257,31 @@ protected static Optional<SnapshotTriggerType>
shouldTriggerSnapshot(
return Optional.empty();
}
+ private static boolean shouldTriggerAutomaticSnapshot(
+ SnapshotType snapshotType, String automaticTriggerExpression,
Instant lastTrigger) {
+ if (StringUtils.isBlank(automaticTriggerExpression)) {
+ return false;
+ }
+
+ boolean shouldTrigger =
+ shouldTriggerIntervalBasedSnapshot(
+ snapshotType, automaticTriggerExpression,
lastTrigger)
+ || shouldTriggerCronBasedSnapshot(
Review Comment:
This condition means that theoretically still both can trigger. We should
make sure that the condition guarantees mutual exclusion. So first check
interval, then check cron based execution.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -284,20 +307,27 @@ static boolean shouldTriggerCronBasedSnapshot(
return false;
}
} catch (ParseException e) {
- LOG.warn("Invalid cron expression: " + cronExpressionString);
return false;
}
}
private static boolean shouldTriggerIntervalBasedSnapshot(
- SnapshotType snapshotType, Duration interval, Instant lastTrigger)
{
+ SnapshotType snapshotType, String triggerExpression, Instant
lastTrigger) {
+
+ Duration interval;
+ try {
+ interval = ConfigurationUtils.convertValue(triggerExpression,
Duration.class);
+ } catch (IllegalArgumentException exception) {
+ return false;
Review Comment:
I'm not sure, this will actually catch all exceptions. There are different
ones thrown. I think we might have to catch `Exception` here. We should
probably add a test to verify the fallback behavior.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -298,37 +298,26 @@ public static String operatorConfigKey(String key) {
+ "Expected format:
headerKey1:headerValue1,headerKey2:headerValue2.");
@Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+ public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
operatorConfig("periodic.savepoint.interval")
- .durationType()
- .defaultValue(Duration.ZERO)
+ .stringType()
+ .defaultValue("")
.withDescription(
- "Interval at which periodic savepoints will be
triggered. "
+ "Option to enable automatic savepoint triggering.
Can be specified "
+ + "either as a Duration type (i.e. '10m')
or as a cron expression."
Review Comment:
```suggestion
+ "either as a Duration type (i.e. '10
minutes') or as a cron expression (see https://en.wikipedia.org/wiki/Cron)."
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -298,37 +298,26 @@ public static String operatorConfigKey(String key) {
+ "Expected format:
headerKey1:headerValue1,headerKey2:headerValue2.");
@Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+ public static final ConfigOption<String> PERIODIC_SAVEPOINT_INTERVAL =
operatorConfig("periodic.savepoint.interval")
- .durationType()
- .defaultValue(Duration.ZERO)
+ .stringType()
+ .defaultValue("")
.withDescription(
- "Interval at which periodic savepoints will be
triggered. "
+ "Option to enable automatic savepoint triggering.
Can be specified "
+ + "either as a Duration type (i.e. '10m')
or as a cron expression."
+ "The triggering schedule is not
guaranteed, savepoints will be "
+ "triggered as part of the regular
reconcile loop. "
+ "WARNING: not intended to be used
together with the cron-based "
+ "periodic savepoint triggering");
@Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<String> PERIODIC_SAVEPOINT_CRON =
- operatorConfig("periodic.savepoint.cron")
- .stringType()
- .defaultValue(
- "0 0 0 29 2 ? 1999") // Default value that is
guaranteed not to trigger
- .withDescription(
- "A cron expression that defines when to
automatically trigger savepoints."
- + "The precise triggering schedule is not
guaranteed, savepoints will be "
- + "triggered as part of the regular
reconcile loop. "
- + "WARNING: not intended to be used
together with the interval-based "
- + "periodic savepoint triggering");
-
- @Documentation.Section(SECTION_DYNAMIC)
- public static final ConfigOption<Duration> PERIODIC_CHECKPOINT_INTERVAL =
+ public static final ConfigOption<String> PERIODIC_CHECKPOINT_INTERVAL =
operatorConfig("periodic.checkpoint.interval")
- .durationType()
- .defaultValue(Duration.ZERO)
+ .stringType()
+ .defaultValue("")
.withDescription(
- "Interval at which periodic checkpoints will be
triggered. "
+ "Option to enable automatic checkpoint triggering.
Can be specified "
+ + "either as a Duration type (i.e. '10m')
or as a cron expression."
Review Comment:
Same comment as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]