[
https://issues.apache.org/jira/browse/FLINK-21248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278797#comment-17278797
]
Ceyhan Kasap commented on FLINK-21248:
--------------------------------------
Hi,
1) IMHO commiting offsets to kafka broker is an indispensable task for kafka a
consumer and flink still does it...
Whean I apply the content of
[https://github.com/apache/flink/pull/14846]
checkpoint cycles begin to complete ( in your case they should be already
completing without this change)
and I can see the following stack trace at the breakpoint:
{noformat}
Thread [Source: Custom Source -> (Map, Sink: Print to Std. Out) (7/8)#0]
(Suspended (breakpoint at line 210 in KafkaFetcher))Thread [Source: Custom
Source -> (Map, Sink: Print to Std. Out) (7/8)#0] (Suspended (breakpoint at
line 210 in KafkaFetcher))
owns: Object (id=157)
KafkaFetcher<T>.doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long>,
KafkaCommitCallback) line: 210
KafkaFetcher<T>(AbstractFetcher<T,KPH>).commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long>,
KafkaCommitCallback) line: 285
FlinkKafkaConsumer<T>(FlinkKafkaConsumerBase<T>).notifyCheckpointComplete(long)
line: 1006
StreamSource<OUT,SRC>(AbstractUdfStreamOperator<OUT,F>).notifyCheckpointComplete(long)
line: 130 StreamOperatorWrapper<OUT,OP>.notifyCheckpointComplete(long) line:
99
SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(long,
OperatorChain<?,?>, Supplier<Boolean>) line: 283
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).notifyCheckpointComplete(long)
line: 990
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).lambda$notifyCheckpointCompleteAsync$11(long)
line: 961
1918490550.run() line: not available
StreamTask<OUT,OP>.lambda$notifyCheckpointOperation$13(RunnableWithException,
CompletableFuture) line: 977
275623379.run() line: not available
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(ThrowingRunnable<E>)
line: 92
Mail.run() line: 78
MailboxProcessor.processMail(TaskMailbox, boolean) line: 302
MailboxProcessor.runMailboxLoop() line: 184
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).runMailboxLoop() line: 575
SourceStreamTask<OUT,SRC,OP>(StreamTask<OUT,OP>).invoke() line: 539
Task.doRun() line: 722
Task.run() line: 547
Thread.run() line: 745 {noformat}
which is indeed as documented below:
[offsetcommit|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.html#commitInternalOffsetsToKafka-java.util.Map-org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback-]
Although we apply fixedDelayRestart strategy with a bounded max count,
interaction with the external downstream operators can take the job down and we
continue from the latest commited consumer group offset rather than manually
creating save points / managing them.
2) Yes I am killing the process as you say and I know that checkpoint will not
be found in the newly started process. But I expect offsets to be committed.
Actually for my case it would not matter event if I used a persistent state
backend , since no checkpoint cycles are completed nothing would be
persisted... My main problem is somehow not getting a single checkpoint cycle
complete...
3) Yes I have cleaned up the repository for a couple of times. Tried both from
work pc and home pc but no luck... Currently I do not have a 1.12 cluster set
up . After cluster is set up and I finish to upgrade our existing jobs (written
against flink 1.5.5 and unfortunately having a lot of compatibility issues) I
will try to try them on cluster and see if this problem occurs there or not.
You may close this issue if you want...Lastly (repeating myself:)) , I do not
have this problem locally for 1.5.5 and it seems major changes occured in
checkpoint mechanism in terms of asynchronous behavior and the closest related
part seems to be merged for
FLINK-16177
[https://github.com/apache/flink/commit/963974f99d18d6a9f36fa78b792dcc2bc9e53de5#diff-2c7004a2d412c3566de5ff6fb9e6d027742328c2acad60685e47ce4ba9df0810]
Please note that I am NOT blaming the commit.
Best regards
> Checkpoint problem in 1.12.0
> ----------------------------
>
> Key: FLINK-21248
> URL: https://issues.apache.org/jira/browse/FLINK-21248
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.12.0
> Reporter: Ceyhan Kasap
> Priority: Major
>
> Hi
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
> to be broken in our kafka connector sourced datastream jobs.
> Since there is a siginificant version gap and there are many backwards
> uncompatible / deprecated changes in flink runtime between versions, I had to
> modify our jobs and noticed that checkpoint offsets are not committed to
> kafka for source connectors.
> To simplfiy the issues I created simple repoducer projects:
> [https://github.com/simpleusr/flink_problem_1.5.5]
> [https://github.com/simpleusr/flink_problem_1.12.0]
> It seems that there are majr changes in the checkpoint infrastructure.
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
> (please note that sample project contains a small hack in
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from
> stopping) :
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> ....................
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> *[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> As far as I see, checkpoint cycle is stuck at waiting in
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...
>
> {code}
> final CompletableFuture<?> coordinatorCheckpointsComplete =
> pendingCheckpointCompletableFuture.thenComposeAsync(
> (pendingCheckpoint) ->
> OperatorCoordinatorCheckpoints
>
> .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
> coordinatorsToCheckpoint,
> pendingCheckpoint,
> timer),
> timer);
> {code}
> Simply returning from
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
> when there is no coordinatorsToCheckpoint seems to resolve the problem:
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> *[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
> I have submitted this pr for this.
> Please help me if I am missing something or there is another solution without
> code change.
> We need to perform the upgrade and modify our jobs as soon as possible (I
> hope other breaking changes do not happen) so any help will be appreciated..
--
This message was sent by Atlassian Jira
(v8.3.4#803005)