[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-19596:
-------------------------------
    Description: 
{{completedCheckpointStore.recover()}} in 
{{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
because the {{CompletedCheckpointStore}} needs to load HDFS files to 
instantialize the {{CompleteCheckpoint}} instances.

The impact is significant in our case below:

* Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
other filesystems.
* If a machine goes down, several containers and tens of tasks are affected, 
which means the {{completedCheckpointStore.recover()}} would be called tens of 
times since the tasks are not in a failover region.

And I notice there is a "TODO" in the source codes:


{code:java}
// Recover the checkpoints, TODO this could be done only when there is a new 
leader, not on each recovery
completedCheckpointStore.recover();
{code}


  was:
{{completedCheckpointStore.recover()}} in 
{{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
because the {{CompletedCheckpointStore}} needs to load HDFS files to 
instantialize the {{CompleteCheckpoint}} instances.

The impact is significant in our case below:

* Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
other filesystems.
* If a machine goes down, several containers and tens of tasks are affected, 
which means the {{completedCheckpointStore.recover()}} would be called tens of 
times since the tasks are not in a failover region.

And I notice there is a "TODO" in the source codes:


{code:java}

                        // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
                        completedCheckpointStore.recover();
{code}

```
TODO this could be done only when there is a new leader, not on each recovery
```


> Do not recover CompletedCheckpointStore on each failover
> --------------------------------------------------------
>
>                 Key: FLINK-19596
>                 URL: https://issues.apache.org/jira/browse/FLINK-19596
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.2
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to