[
https://issues.apache.org/jira/browse/FLINK-38617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Baozhu Zhao updated FLINK-38617:
--------------------------------
Description:
Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.
1. When a checkpoint completed, CheckpointCoordinator call
`KafkaSourceReader.notifyCheckpointComplete()`.
2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to
`SplitFetcher.taskQueue`.
3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is
interrupted by `SplitFetcher.shutdown()`.
During the "stop-with-savepoint" period, using Arthas to view the call logs
also confirmed this issue:
{panel:title=Arthas log}
[arthas@1]$ watch
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
notifyCheckpointComplete -x 2 -n 200
Press Q or Ctrl+C to abort.
Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is
going to shutdown.
$ command terminated with exit code 137
{panel}
was:
Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.
1. When a checkpoint completed, CheckpointCoordinator call `KafkaSourceReader.
notifyCheckpointComplete()`.
2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to
`SplitFetcher.taskQueue`.
3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is
interrupted by `SplitFetcher.shutdown()`.
During the "stop-with-savepoint" period, using Arthas to view the call logs
also confirmed this issue:
```
[arthas@1]$ watch
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
notifyCheckpointComplete -x 2 -n 200
Press Q or Ctrl+C to abort.
Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is
going to shutdown.
$ command terminated with exit code 137
```
> Kafka consumer do not commit offset when stop-with-savepoint
> ------------------------------------------------------------
>
> Key: FLINK-38617
> URL: https://issues.apache.org/jira/browse/FLINK-38617
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.17.2, 1.20.3
> Environment: flink 1.17
> Reporter: Baozhu Zhao
> Priority: Critical
>
> Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.
> 1. When a checkpoint completed, CheckpointCoordinator call
> `KafkaSourceReader.notifyCheckpointComplete()`.
> 2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to
> `SplitFetcher.taskQueue`.
> 3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is
> interrupted by `SplitFetcher.shutdown()`.
>
> During the "stop-with-savepoint" period, using Arthas to view the call logs
> also confirmed this issue:
> {panel:title=Arthas log}
> [arthas@1]$ watch
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
> notifyCheckpointComplete -x 2 -n 200
> Press Q or Ctrl+C to abort.
> Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
> session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is
> going to shutdown.
> $ command terminated with exit code 137
> {panel}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)