[ 
https://issues.apache.org/jira/browse/FLINK-21248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278566#comment-17278566
 ] 

Ceyhan Kasap commented on FLINK-21248:
--------------------------------------

Hi [~trohrmann] ,

As far as I know, savepoints are manually operated. Unfortunately we are 
currently not able to use savepoints since operator ids are missing in the jobs 
but that is another story (we also actually  did not require it either until 
now).

However, as I mentioned our sources are kafka, so flink kafka consumer achieves 
the fault tolerance behavior we require as explained below:

[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance]

"With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all its Kafka offsets, 
together with the state of other operations. In case of a job failure, Flink 
will restore the streaming program to the state of the latest checkpoint and 
re-consume the records from Kafka, starting from the offsets that were stored 
in the checkpoint."

So it is very crucial for us to checkpoint operate normally. Being able to 
consume from the latest committed offset is what we are relying on for fault 
tolerance in case of errors. With no checkpoint cycles, no offsets are stored 
in kafka.

I am still wondering why I am seeing this behavior although you do not...

Regards

> Checkpoint problem in 1.12.0
> ----------------------------
>
>                 Key: FLINK-21248
>                 URL: https://issues.apache.org/jira/browse/FLINK-21248
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Ceyhan Kasap
>            Priority: Major
>
> Hi
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems 
> to be broken in our kafka connector sourced datastream jobs.
> Since there is a siginificant version gap and there are many backwards 
> uncompatible / deprecated changes in flink runtime between versions, I had to 
> modify our jobs and noticed that checkpoint offsets are not committed to 
> kafka for source connectors.
> To simplfiy the issues I created simple repoducer projects:
> [https://github.com/simpleusr/flink_problem_1.5.5]
> [https://github.com/simpleusr/flink_problem_1.12.0]
> It seems that there are majr changes in the checkpoint infrastructure.
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs 
> (please note that sample project contains a small hack in 
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from 
> stopping) :
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> ....................
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> As far as I see, checkpoint cycle is stuck at waiting in 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for 
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...
>  
> {code}
> final CompletableFuture<?> coordinatorCheckpointsComplete =
>                 pendingCheckpointCompletableFuture.thenComposeAsync(
>                         (pendingCheckpoint) ->
>                                 OperatorCoordinatorCheckpoints
>                                         
> .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>                                                 coordinatorsToCheckpoint,
>                                                 pendingCheckpoint,
>                                                 timer),
>                         timer);
> {code}
> Simply returning from 
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
>  when there is no coordinatorsToCheckpoint seems to resolve the problem:
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> I have submitted this pr for this.
> Please help me if I am missing something or there is another solution without 
> code change.
> We need to perform the upgrade and modify our jobs as soon as possible (I 
> hope other breaking changes do not happen) so any help will be appreciated..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to