[ 
https://issues.apache.org/jira/browse/FLINK-21248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278835#comment-17278835
 ] 

Ceyhan Kasap commented on FLINK-21248:
--------------------------------------

Hi,

As far as I understand the statements  [here in bold 
|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]
 are confirming that after checkpoint completes offset is commited in the 
broker side...

_Checkpointing enabled: *i+f checkpointing is enabled, the Flink Kafka Consumer 
will commit the offsets stored in the checkpointed states when the checkpoints 
are completed. This ensures that the committed offsets in Kafka brokers is 
consistent with the offsets in the checkpointed states.+* Users can choose to 
disable or enable offset committing by calling the 
setCommitOffsetsOnCheckpoints(boolean) method on the consumer (by default, the 
behaviour is true). Note that in this scenario, the automatic periodic offset 
committing settings in Properties is completely ignored_

As long as there is at least one committed offset in kafka broker side, the new 
job process without any checkpoint /savepoint should poll starting from the 
latest offset.

auto.offset.reset should kick in when there is no previous offset committed.

[https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto.offset.reset]

To my knowledge this is standard kafka consumer behavior.

Besides, the behavior I am seeing is confirming the above .

I have attached the logs of sample runs.

[^problem_logs.zip]

Order of events / run explanation:

1) errorcase_run1.log -> start job with problem. send three messages (msg1, 
msg2, msg3) and see them consumed. But checkpoints are not completed so no 
offset commit. kill the job.
2) errorcase_run2.log -> start job with problem. Since no previous offset , 
three messages (msg1, msg2, msg3) are reconsumed. Checkpoints are still not 
completed. kill the job.
3) patchapplied_successcase_run1.log -> start job without problem (patch 
applied) . Since no previous offset , three messages (msg1, msg2, msg3) are 
reconsumed. but this time checkpoints are success and offsets are committed. 
kill the job.
4) patchapplied_successcase_run2.log -> start job without problem (patch 
applied) . This time there is no previous offset , three messages (msg1, msg2, 
msg3) are NOT reconsumed. add two new messages to topic (msg4 and msg5) and see 
them consumed. kill the job.

After long hours, I still can not understand why the future you mentioned not 
completed immediately...

Best regards

> Checkpoint problem in 1.12.0
> ----------------------------
>
>                 Key: FLINK-21248
>                 URL: https://issues.apache.org/jira/browse/FLINK-21248
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Ceyhan Kasap
>            Priority: Major
>
> Hi
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems 
> to be broken in our kafka connector sourced datastream jobs.
> Since there is a siginificant version gap and there are many backwards 
> uncompatible / deprecated changes in flink runtime between versions, I had to 
> modify our jobs and noticed that checkpoint offsets are not committed to 
> kafka for source connectors.
> To simplfiy the issues I created simple repoducer projects:
> [https://github.com/simpleusr/flink_problem_1.5.5]
> [https://github.com/simpleusr/flink_problem_1.12.0]
> It seems that there are majr changes in the checkpoint infrastructure.
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs 
> (please note that sample project contains a small hack in 
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from 
> stopping) :
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> ....................
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> As far as I see, checkpoint cycle is stuck at waiting in 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for 
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...
>  
> {code}
> final CompletableFuture<?> coordinatorCheckpointsComplete =
>                 pendingCheckpointCompletableFuture.thenComposeAsync(
>                         (pendingCheckpoint) ->
>                                 OperatorCoordinatorCheckpoints
>                                         
> .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>                                                 coordinatorsToCheckpoint,
>                                                 pendingCheckpoint,
>                                                 timer),
>                         timer);
> {code}
> Simply returning from 
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
>  when there is no coordinatorsToCheckpoint seems to resolve the problem:
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> I have submitted this pr for this.
> Please help me if I am missing something or there is another solution without 
> code change.
> We need to perform the upgrade and modify our jobs as soon as possible (I 
> hope other breaking changes do not happen) so any help will be appreciated..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to