[ 
https://issues.apache.org/jira/browse/FLINK-17477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-17477:
-----------------------------------
    Description: 
We should be calling {{InputGate#resumeConsumption()}} as soon as possible (to 
avoid any unnecessary delay/latency when task is idling). Currently I think 
it’s mostly fine - the important bit is that on the happy path, we always 
{{resumeConsumption}} before trying to complete the checkpoint, so that netty 
threads will start resuming the network traffic while the task thread is doing 
the synchronous part of the checkpoint and starting asynchronous part. But I 
think in two places we are first aborting checkpoint and only then resuming 
consumption (in CheckpointBarrierAligner):
{code}
                                // let the task know we are not completing this
                                notifyAbort(currentCheckpointId,
                                        new CheckpointException(
                                                "Barrier id: " + barrierId,
                                                
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
{code}
{code}
                        // let the task know we skip a checkpoint
                        notifyAbort(currentCheckpointId,
                                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
{code}
It’s not a big deal, as those are a rare conditions, but it would be better to 
be consistent everywhere: first release blocks and resume consumption, before 
anything else happens. 

  was:
We should be calling {{InputGate#resumeConsumption()}} as soon as possible (to 
avoid any unnecessary delay/latency when task is idling). Currently I think 
it’s mostly fine - the important bit is that on the happy path, we always 
{{resumeConsumption}} before trying to complete the checkpoint, so that netty 
threads will start resuming the network traffic while the task thread is doing 
the synchronous part of the checkpoint and starting asynchronous part. But I 
think in two places we are first aborting checkpoint and only then resuming 
consumption (in CheckpointBarrierAligner):
{{code}}
                                // let the task know we are not completing this
                                notifyAbort(currentCheckpointId,
                                        new CheckpointException(
                                                "Barrier id: " + barrierId,
                                                
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
{{code}}
{{code}}
                        // let the task know we skip a checkpoint
                        notifyAbort(currentCheckpointId,
                                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
{{code}}
It’s not a big deal, as those are a rare conditions, but it would be better to 
be consistent everywhere: first release blocks and resume consumption, before 
anything else happens. 


> resumeConsumption call should happen as quickly as possible to minimise 
> latency
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-17477
>                 URL: https://issues.apache.org/jira/browse/FLINK-17477
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network
>            Reporter: Piotr Nowojski
>            Priority: Minor
>             Fix For: 1.11.0
>
>
> We should be calling {{InputGate#resumeConsumption()}} as soon as possible 
> (to avoid any unnecessary delay/latency when task is idling). Currently I 
> think it’s mostly fine - the important bit is that on the happy path, we 
> always {{resumeConsumption}} before trying to complete the checkpoint, so 
> that netty threads will start resuming the network traffic while the task 
> thread is doing the synchronous part of the checkpoint and starting 
> asynchronous part. But I think in two places we are first aborting checkpoint 
> and only then resuming consumption (in CheckpointBarrierAligner):
> {code}
>                               // let the task know we are not completing this
>                               notifyAbort(currentCheckpointId,
>                                       new CheckpointException(
>                                               "Barrier id: " + barrierId,
>                                               
> CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
>                               // abort the current checkpoint
>                               releaseBlocksAndResetBarriers();
> {code}
> {code}
>                       // let the task know we skip a checkpoint
>                       notifyAbort(currentCheckpointId,
>                               new 
> CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
>                       // no chance to complete this checkpoint
>                       releaseBlocksAndResetBarriers();
> {code}
> It’s not a big deal, as those are a rare conditions, but it would be better 
> to be consistent everywhere: first release blocks and resume consumption, 
> before anything else happens. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to