lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1271906245
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +446,36 @@ public boolean isShutdown() {
return shutdown;
}
+ /**
+ * Reports whether a source operator is currently processing backlog.
+ *
+ * <p>If any source operator is processing backlog, the checkpoint
interval would be decided by
+ * {@code execution.checkpointing.interval-during-backlog} instead of
{@code
+ * execution.checkpointing.interval}.
+ *
+ * <p>If a source has not invoked this method, the source is considered to
have
+ * isProcessingBacklog=false. If a source operator has invoked this method
multiple times, the
+ * last reported value is used.
+ *
+ * @param operatorID the operator ID of the source operator.
+ * @param isProcessingBacklog whether the source operator is processing
backlog.
+ */
+ public void setIsProcessingBacklog(OperatorID operatorID, boolean
isProcessingBacklog) {
+ if (isProcessingBacklog) {
+ backlogOperators.add(operatorID);
+ } else {
+ backlogOperators.remove(operatorID);
+ }
+
+ if (getCurrentCheckpointInterval() != Long.MAX_VALUE) {
+ long newNextCheckpointTriggeringTime =
+ clock.absoluteTimeMillis() +
getCurrentCheckpointInterval();
Review Comment:
It is inconsistent to use `absoluteTimeMillis()` here and
`relativeTimeMillis()` within `rescheduleTrigger()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1915,6 +1965,14 @@ public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore;
}
+ /**
+ * Gets the checkpoint interval. Its value might vary depending on whether
there is processing
+ * backlog.
+ */
+ private long getCurrentCheckpointInterval() {
+ return backlogOperators.isEmpty() ? baseInterval :
baseIntervalDuringBacklog;
Review Comment:
Can we exclude operators that have finished?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +446,36 @@ public boolean isShutdown() {
return shutdown;
}
+ /**
+ * Reports whether a source operator is currently processing backlog.
+ *
+ * <p>If any source operator is processing backlog, the checkpoint
interval would be decided by
+ * {@code execution.checkpointing.interval-during-backlog} instead of
{@code
+ * execution.checkpointing.interval}.
+ *
+ * <p>If a source has not invoked this method, the source is considered to
have
+ * isProcessingBacklog=false. If a source operator has invoked this method
multiple times, the
+ * last reported value is used.
+ *
+ * @param operatorID the operator ID of the source operator.
+ * @param isProcessingBacklog whether the source operator is processing
backlog.
+ */
+ public void setIsProcessingBacklog(OperatorID operatorID, boolean
isProcessingBacklog) {
+ if (isProcessingBacklog) {
+ backlogOperators.add(operatorID);
+ } else {
+ backlogOperators.remove(operatorID);
+ }
+
+ if (getCurrentCheckpointInterval() != Long.MAX_VALUE) {
+ long newNextCheckpointTriggeringTime =
+ clock.absoluteTimeMillis() +
getCurrentCheckpointInterval();
+ if (newNextCheckpointTriggeringTime <
nextCheckpointTriggeringRelativeTime) {
+ rescheduleTrigger(getCurrentCheckpointInterval());
Review Comment:
Would it be simpler to re-use the `newNextCheckpointTriggeringTime` obtained
above rather than getting the timestamp twice (with slightly different values)?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +446,36 @@ public boolean isShutdown() {
return shutdown;
}
+ /**
+ * Reports whether a source operator is currently processing backlog.
+ *
+ * <p>If any source operator is processing backlog, the checkpoint
interval would be decided by
+ * {@code execution.checkpointing.interval-during-backlog} instead of
{@code
+ * execution.checkpointing.interval}.
+ *
+ * <p>If a source has not invoked this method, the source is considered to
have
+ * isProcessingBacklog=false. If a source operator has invoked this method
multiple times, the
+ * last reported value is used.
+ *
+ * @param operatorID the operator ID of the source operator.
+ * @param isProcessingBacklog whether the source operator is processing
backlog.
+ */
+ public void setIsProcessingBacklog(OperatorID operatorID, boolean
isProcessingBacklog) {
+ if (isProcessingBacklog) {
+ backlogOperators.add(operatorID);
+ } else {
+ backlogOperators.remove(operatorID);
+ }
+
+ if (getCurrentCheckpointInterval() != Long.MAX_VALUE) {
+ long newNextCheckpointTriggeringTime =
+ clock.absoluteTimeMillis() +
getCurrentCheckpointInterval();
+ if (newNextCheckpointTriggeringTime <
nextCheckpointTriggeringRelativeTime) {
Review Comment:
Is the following sequence of events possible:
- newNextCheckpointTriggeringTime = newNextCheckpointTriggeringTime - 1 ms.
- Right before currentPeriodicTrigger.cancel() is invoked, checkpoint is
triggered.
- Then we schedule/trigger another checkpoint almost immediately after the
last checkpoint is triggered.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -181,7 +188,16 @@ public class CheckpointCoordinator {
private JobStatusListener jobStatusListener;
/** A handle to the current periodic trigger, to cancel it when necessary.
*/
- private ScheduledFuture<?> currentPeriodicTrigger;
+ private Future<?> currentPeriodicTrigger;
Review Comment:
It seems that `currentPeriodicTrigger` is always a `ScheduledFuture`. To
avoid back-and-forth code change, would it be better to keep the existing code
unless there is a good reason to change it?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -181,7 +188,16 @@ public class CheckpointCoordinator {
private JobStatusListener jobStatusListener;
/** A handle to the current periodic trigger, to cancel it when necessary.
*/
- private ScheduledFuture<?> currentPeriodicTrigger;
+ private Future<?> currentPeriodicTrigger;
+
+ /**
+ * The timestamp (via {@link Clock#relativeTimeMillis()}) when the next
checkpoint will be
+ * triggered.
+ *
+ * <p>If it's value is {@link Long#MAX_VALUE}, it means there is not a
next checkpoint
+ * scheduled.
+ */
+ private long nextCheckpointTriggeringRelativeTime;
Review Comment:
Given that `nextCheckpointTriggeringRelativeTime` might be written by
different threads, we should mark it as `volatile`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]