[ 
https://issues.apache.org/jira/browse/FLINK-21248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278797#comment-17278797
 ] 

Ceyhan Kasap commented on FLINK-21248:
--------------------------------------

Hi,

1) IMHO commiting offsets to kafka broker is an indispensable task for kafka a 
consumer and flink still does it...

Whean I apply the content of

[https://github.com/apache/flink/pull/14846]

checkpoint cycles begin to complete ( in your case they should be already 
completing without this change)

and I can see the following stack trace at the breakpoint:
{noformat}
Thread [Source: Custom Source -> (Map, Sink: Print to Std. Out) (7/8)#0] 
(Suspended (breakpoint at line 210 in KafkaFetcher))Thread [Source: Custom 
Source -> (Map, Sink: Print to Std. Out) (7/8)#0] (Suspended (breakpoint at 
line 210 in KafkaFetcher))
 owns: Object  (id=157)  
 KafkaFetcher<T>.doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long>, 
KafkaCommitCallback) line: 210   
 
KafkaFetcher<T>(AbstractFetcher<T,KPH>).commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long>,
 KafkaCommitCallback) line: 285          
FlinkKafkaConsumer<T>(FlinkKafkaConsumerBase<T>).notifyCheckpointComplete(long) 
line: 1006
  
StreamSource<OUT,SRC>(AbstractUdfStreamOperator<OUT,F>).notifyCheckpointComplete(long)
 line: 130   StreamOperatorWrapper<OUT,OP>.notifyCheckpointComplete(long) line: 
99
 SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(long, 
OperatorChain<?,?>, Supplier<Boolean>) line: 283
 
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).notifyCheckpointComplete(long) 
line: 990
 
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).lambda$notifyCheckpointCompleteAsync$11(long)
 line: 961
 1918490550.run() line: not available
 StreamTask<OUT,OP>.lambda$notifyCheckpointOperation$13(RunnableWithException, 
CompletableFuture) line: 977
  275623379.run() line: not available
  
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(ThrowingRunnable<E>)
 line: 92
  Mail.run() line: 78
 MailboxProcessor.processMail(TaskMailbox, boolean) line: 302
  MailboxProcessor.runMailboxLoop() line: 184
  SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).runMailboxLoop() line: 575
  SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).invoke() line: 539
  Task.doRun() line: 722
  Task.run() line: 547 
  Thread.run() line: 745 {noformat}
which is indeed as documented below:

 
[offsetcommit|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.html#commitInternalOffsetsToKafka-java.util.Map-org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback-]

Although we apply fixedDelayRestart strategy with a bounded max count, 
interaction with the external downstream operators can take the job down and we 
continue from the latest commited consumer group offset rather than manually 
creating save points / managing them.

 2) Yes I am killing the process as you say and I know that checkpoint will not 
be found in the newly started process. But I expect offsets to be committed. 
Actually for my case it would not matter event if I used a persistent state 
backend , since no checkpoint cycles are completed nothing would be 
persisted... My main problem is somehow not getting a single checkpoint cycle 
complete...

3) Yes I have cleaned up the repository for a couple of times. Tried both from 
work pc and home pc but no luck... Currently I do not have a 1.12 cluster set 
up . After cluster is set up and I finish to upgrade our existing jobs (written 
against flink 1.5.5 and unfortunately having a lot of compatibility issues) I 
will try to try them on cluster and see if this problem occurs there or not. 
You may close this issue if you want...Lastly (repeating myself:)) , I do not 
have this problem locally for 1.5.5 and it seems major changes occured in 
checkpoint mechanism in terms of asynchronous behavior and the closest related 
part seems to be merged for

FLINK-16177

 

[https://github.com/apache/flink/commit/963974f99d18d6a9f36fa78b792dcc2bc9e53de5#diff-2c7004a2d412c3566de5ff6fb9e6d027742328c2acad60685e47ce4ba9df0810]

Please note that I am NOT blaming the commit.

Best regards

> Checkpoint problem in 1.12.0
> ----------------------------
>
>                 Key: FLINK-21248
>                 URL: https://issues.apache.org/jira/browse/FLINK-21248
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Ceyhan Kasap
>            Priority: Major
>
> Hi
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems 
> to be broken in our kafka connector sourced datastream jobs.
> Since there is a siginificant version gap and there are many backwards 
> uncompatible / deprecated changes in flink runtime between versions, I had to 
> modify our jobs and noticed that checkpoint offsets are not committed to 
> kafka for source connectors.
> To simplfiy the issues I created simple repoducer projects:
> [https://github.com/simpleusr/flink_problem_1.5.5]
> [https://github.com/simpleusr/flink_problem_1.12.0]
> It seems that there are majr changes in the checkpoint infrastructure.
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs 
> (please note that sample project contains a small hack in 
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from 
> stopping) :
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> ....................
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> As far as I see, checkpoint cycle is stuck at waiting in 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for 
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...
>  
> {code}
> final CompletableFuture<?> coordinatorCheckpointsComplete =
>                 pendingCheckpointCompletableFuture.thenComposeAsync(
>                         (pendingCheckpoint) ->
>                                 OperatorCoordinatorCheckpoints
>                                         
> .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>                                                 coordinatorsToCheckpoint,
>                                                 pendingCheckpoint,
>                                                 timer),
>                         timer);
> {code}
> Simply returning from 
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
>  when there is no coordinatorsToCheckpoint seems to resolve the problem:
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> I have submitted this pr for this.
> Please help me if I am missing something or there is another solution without 
> code change.
> We need to perform the upgrade and modify our jobs as soon as possible (I 
> hope other breaking changes do not happen) so any help will be appreciated..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to