gyfora commented on code in PR #656:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/656#discussion_r1302885289
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java:
##########
@@ -229,33 +228,85 @@ protected static Optional<SnapshotTriggerType>
shouldTriggerSnapshot(
var triggerNonceChanged =
triggerNonce != null &&
!triggerNonce.equals(reconciledTriggerNonce);
if (triggerNonceChanged) {
- return Optional.of(SnapshotTriggerType.MANUAL);
- }
-
- if (interval.isZero()) {
- return Optional.empty();
+ if (snapshotType == CHECKPOINT &&
!isSnapshotTriggeringSupported(conf)) {
+ LOG.warn(
+ "Manual checkpoint triggering is attempted, but is not
supported (requires Flink 1.17+)");
+ return Optional.empty();
+ } else {
+ return Optional.of(SnapshotTriggerType.MANUAL);
+ }
}
var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
-
// When the resource is first created/periodic snapshotting enabled we
have to compare
// against the creation timestamp for triggering the first periodic
savepoint
var lastTrigger =
lastTriggerTs == 0
?
Instant.parse(resource.getMetadata().getCreationTimestamp())
: Instant.ofEpochMilli(lastTriggerTs);
+
+ if (shouldTriggerIntervalBasedSnapshot(snapshotType, interval,
lastTrigger)
+ || shouldTriggerCronBasedSnapshot(
+ snapshotType, cronExpression, lastTrigger,
Instant.now())) {
+ if (snapshotType == CHECKPOINT &&
!isSnapshotTriggeringSupported(conf)) {
+ LOG.warn(
+ "Periodic checkpoints triggering is configured but is
not supported (requires Flink 1.17+)");
+ return Optional.empty();
+ } else {
+ return Optional.of(SnapshotTriggerType.PERIODIC);
+ }
+ }
+ return Optional.empty();
+ }
+
+ @VisibleForTesting
+ static boolean shouldTriggerCronBasedSnapshot(
+ SnapshotType snapshotType,
+ String cronExpressionString,
+ Instant lastTriggerDateInstant,
+ Instant nowInstant) {
+ try {
+ CronExpression cronExpression = new
CronExpression(cronExpressionString);
+ Date now = Date.from(nowInstant);
+ Date lastTrigger = Date.from(lastTriggerDateInstant);
+
+ Date nextValidTimeAfterLastTrigger =
cronExpression.getNextValidTimeAfter(lastTrigger);
+
+ if (nextValidTimeAfterLastTrigger != null
+ && nextValidTimeAfterLastTrigger.before(now)) {
+ LOG.info(
+ "Triggering new periodic {} based on cron schedule
'{}' due at {}",
+ snapshotType.toString().toLowerCase(),
+ cronExpressionString,
+ nextValidTimeAfterLastTrigger);
+ return true;
+ } else {
+ return false;
+ }
+ } catch (ParseException e) {
+ LOG.warn("Invalid cron expression: " + cronExpressionString);
Review Comment:
should we send a Kube event so the user sees this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]