[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168415#comment-17168415
 ] 

tao wang edited comment on FLINK-18748 at 7/31/20, 5:41 AM:
------------------------------------------------------------

[~klion26] Ok, I have read the doc, thanks.

I have some thoughts on this issue:

If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll 
skip step 2, and if nextTriggerDelayMillis > 0,  onTooEarly will be called.

 

 
{code:java}
private Optional<CheckpointTriggerRequest> onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();

   // first.isForce() will be false if running in unaligned checkpoints mode
   if (first.isForce()) {
      return Optional.of(queuedRequests.pollFirst());
   } else if (first.isPeriodic) {
   // first.isPeriodic will be false too if the savepoint is trigger by user

      queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
      rescheduleTrigger.accept(nextTriggerDelayMillis);
      return Optional.empty();
   } else {
   
   // we will get Optional.empty()
      return Optional.empty();
   }
}{code}
 

In this case, the onTooEarly function will return Optional.empty() if a user 
triggers a savepoint to early while the job is running in unaligned checkpoints 
mode.

I think this is the reason why some savepoint gets stuck at IN_PROGRESS state.

 

Maybe I'll change onTooEarly to this:

 
{code:java}
private Optional<CheckpointTriggerRequest> onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();
   if (first.isPeriodic) {
      queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
      rescheduleTrigger.accept(nextTriggerDelayMillis);
      return Optional.empty();
   } 
   return Optional.of(queuedRequests.pollFirst());
}

{code}
By the way, I think nextTriggerDelayMillis will not cost any time, so there is 
no necessary to change any code in chooseRequestToExecute.

 

 


was (Author: wayland):
[~klion26] Ok, I have read the doc, thanks.

I have some thoughts on this issue:

If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll 
skip step 2, and if nextTriggerDelayMillis > 0,  onTooEarly will be called.

 

 
{code:java}
private Optional<CheckpointTriggerRequest> onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();

   // first.isForce() will be false if running in unaligned checkpoints mode
   if (first.isForce()) {
      return Optional.of(queuedRequests.pollFirst());
   } else if (first.isPeriodic) {
   // first.isPeriodic will be false too if the savepoint is trigger by user

      queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
      rescheduleTrigger.accept(nextTriggerDelayMillis);
      return Optional.empty();
   } else {
   
   // we will get Optional.empty()
      return Optional.empty();
   }
}{code}
 

In this case, the onTooEarly function will return Optional.empty() if a user 
triggers a savepoint to early while the job is running in unaligned checkpoints 
mode.

I think this is the reason why some savepoint gets stuck at IN_PROGRESS state.

 

Maybe I'll change onTooEarly to this:

 
{code:java}

private Optional<CheckpointTriggerRequest> onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();
   if (first.isPeriodic) {
      queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
      rescheduleTrigger.accept(nextTriggerDelayMillis);
      return Optional.empty();
   } 
   return Optional.of(queuedRequests.pollFirst());
}

{code}
By the way, I think nextTriggerDelayMillis will not wait any time, so there is 
no necessary to change any code in chooseRequestToExecute.

 

 

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18748
>                 URL: https://issues.apache.org/jira/browse/FLINK-18748
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.0, 1.11.1
>            Reporter: Congxian Qiu(klion26)
>            Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>    return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>    return Optional.of(queuedRequests.first())
>       .filter(CheckpointTriggerRequest::isForce)
>       .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>    return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to