[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168415#comment-17168415 ]
tao wang edited comment on FLINK-18748 at 7/31/20, 5:41 AM: ------------------------------------------------------------ [~klion26] Ok, I have read the doc, thanks. I have some thoughts on this issue: If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll skip step 2, and if nextTriggerDelayMillis > 0, onTooEarly will be called. {code:java} private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); // first.isForce() will be false if running in unaligned checkpoints mode if (first.isForce()) { return Optional.of(queuedRequests.pollFirst()); } else if (first.isPeriodic) { // first.isPeriodic will be false too if the savepoint is trigger by user queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } else { // we will get Optional.empty() return Optional.empty(); } }{code} In this case, the onTooEarly function will return Optional.empty() if a user triggers a savepoint to early while the job is running in unaligned checkpoints mode. I think this is the reason why some savepoint gets stuck at IN_PROGRESS state. Maybe I'll change onTooEarly to this: {code:java} private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); if (first.isPeriodic) { queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } return Optional.of(queuedRequests.pollFirst()); } {code} By the way, I think nextTriggerDelayMillis will not cost any time, so there is no necessary to change any code in chooseRequestToExecute. was (Author: wayland): [~klion26] Ok, I have read the doc, thanks. I have some thoughts on this issue: If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll skip step 2, and if nextTriggerDelayMillis > 0, onTooEarly will be called. {code:java} private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); // first.isForce() will be false if running in unaligned checkpoints mode if (first.isForce()) { return Optional.of(queuedRequests.pollFirst()); } else if (first.isPeriodic) { // first.isPeriodic will be false too if the savepoint is trigger by user queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } else { // we will get Optional.empty() return Optional.empty(); } }{code} In this case, the onTooEarly function will return Optional.empty() if a user triggers a savepoint to early while the job is running in unaligned checkpoints mode. I think this is the reason why some savepoint gets stuck at IN_PROGRESS state. Maybe I'll change onTooEarly to this: {code:java} private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); if (first.isPeriodic) { queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } return Optional.of(queuedRequests.pollFirst()); } {code} By the way, I think nextTriggerDelayMillis will not wait any time, so there is no necessary to change any code in chooseRequestToExecute. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > --------------------------------------------------------------------------------------------- > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.11.0, 1.11.1 > Reporter: Congxian Qiu(klion26) > Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { > return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { > return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { > return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)