[ 
https://issues.apache.org/jira/browse/FLINK-21248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278193#comment-17278193
 ] 

Till Rohrmann commented on FLINK-21248:
---------------------------------------

In order to continue your job from where it left off you need to create a 
savepoint from which you resume the new job. Only then, the new job will know 
from where to start reading the Kafka log.

> Checkpoint problem in 1.12.0
> ----------------------------
>
>                 Key: FLINK-21248
>                 URL: https://issues.apache.org/jira/browse/FLINK-21248
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Ceyhan Kasap
>            Priority: Major
>
> Hi
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems 
> to be broken in our kafka connector sourced datastream jobs.
> Since there is a siginificant version gap and there are many backwards 
> uncompatible / deprecated changes in flink runtime between versions, I had to 
> modify our jobs and noticed that checkpoint offsets are not committed to 
> kafka for source connectors.
> To simplfiy the issues I created simple repoducer projects:
> [https://github.com/simpleusr/flink_problem_1.5.5]
> [https://github.com/simpleusr/flink_problem_1.12.0]
> It seems that there are majr changes in the checkpoint infrastructure.
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs 
> (please note that sample project contains a small hack in 
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from 
> stopping) :
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> ....................
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> As far as I see, checkpoint cycle is stuck at waiting in 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for 
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...
>  
> {code}
> final CompletableFuture<?> coordinatorCheckpointsComplete =
>                 pendingCheckpointCompletableFuture.thenComposeAsync(
>                         (pendingCheckpoint) ->
>                                 OperatorCoordinatorCheckpoints
>                                         
> .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>                                                 coordinatorsToCheckpoint,
>                                                 pendingCheckpoint,
>                                                 timer),
>                         timer);
> {code}
> Simply returning from 
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
>  when there is no coordinatorsToCheckpoint seems to resolve the problem:
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> I have submitted this pr for this.
> Please help me if I am missing something or there is another solution without 
> code change.
> We need to perform the upgrade and modify our jobs as soon as possible (I 
> hope other breaking changes do not happen) so any help will be appreciated..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to