[ 
https://issues.apache.org/jira/browse/FLINK-38617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baozhu Zhao updated FLINK-38617:
--------------------------------
    Description: 
Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.

1. When a checkpoint completed, CheckpointCoordinator call 
`KafkaSourceReader.notifyCheckpointComplete()`.
2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to 
`SplitFetcher.taskQueue`.
3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is 
interrupted by `SplitFetcher.shutdown()`.
 
During the "stop-with-savepoint" period, using Arthas to view the call logs 
also confirmed this issue:
{panel:title=Arthas log}
[arthas@1]$ watch  
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 
notifyCheckpointComplete -x 2 -n 200
Press Q or Ctrl+C to abort.
Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is 
going to shutdown.
$ command terminated with exit code 137
{panel}



  was:
Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.

1. When a checkpoint completed, CheckpointCoordinator call `KafkaSourceReader.
notifyCheckpointComplete()`.
2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to 
`SplitFetcher.taskQueue`.
3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is 
interrupted by `SplitFetcher.shutdown()`.
 
During the "stop-with-savepoint" period, using Arthas to view the call logs 
also confirmed this issue:
```
[arthas@1]$ watch  
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 
notifyCheckpointComplete -x 2 -n 200
Press Q or Ctrl+C to abort.
Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is 
going to shutdown.
$ command terminated with exit code 137
```


> Kafka consumer do not commit offset when stop-with-savepoint
> ------------------------------------------------------------
>
>                 Key: FLINK-38617
>                 URL: https://issues.apache.org/jira/browse/FLINK-38617
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.17.2, 1.20.3
>         Environment: flink 1.17
>            Reporter: Baozhu Zhao
>            Priority: Critical
>
> Recently, we saw Kafka consumer do not commit offset when stop-with-savepoint.
> 1. When a checkpoint completed, CheckpointCoordinator call 
> `KafkaSourceReader.notifyCheckpointComplete()`.
> 2. Then `splitFetcherManager.commitOffsets()` enqueue an OffsetsCommitTask to 
> `SplitFetcher.taskQueue`.
> 3. Before the taskQueue is emptied, the loop of `SplitFetcher.runOnce()` is 
> interrupted by `SplitFetcher.shutdown()`.
>  
> During the "stop-with-savepoint" period, using Arthas to view the call logs 
> also confirmed this issue:
> {panel:title=Arthas log}
> [arthas@1]$ watch  
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 
> notifyCheckpointComplete -x 2 -n 200
> Press Q or Ctrl+C to abort.
> Affect(class count: 2 , method count: 1) cost in 518 ms, listenerId: 1
> session (396ccb27-c9b4-438c-b137-6936c7756765) is closed because server is 
> going to shutdown.
> $ command terminated with exit code 137
> {panel}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to