[ 
https://issues.apache.org/jira/browse/FLINK-35419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-35419:
--------------------------------
    Description: 
When running the kafka connector in bounded mode, the stop condition can be 
defined as the latest offset when the job starts. Unfortunately, Kafka's latest 
offset calculation also includes special marker records, such as transaction 
markers, in the overall count.

 

When Flink waits for a job to finish, it compares the number of records read 
until the point with the original latest offset [1]. Since the consumer will 
never see the special marker records, the latest offset is never reached, and 
the job gets stuck. 

 

To reproduce the issue, you can write into a Kafka topic and make sure that the 
latest record is a transaction end event. Afterwards you can start a Flink job 
configured with `scan.bounded.latest-offset` pointing to that topic.

 

[1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128

  was:
When running the kafka connector in bounded mode, the stop condition can be 
defined as the latest offset when the job starts.

 

Unfortunately, Kafka's latest offset calculation also includes special marker 
records, such as transaction markers, in the overall count.

 

When Flink waits for a job to finish, it compares the number of records read 
until the point with the original latest offset [1]. Since the consumer will 
never see the special marker records, the latest offset is never reached, and 
the job gets stuck. 

 

To reproduce the issue, you can write into a Kafka topic and make sure that the 
latest record is a transaction end event. Afterwards you can start a Flink job 
configured with `scan.bounded.latest-offset` pointing to that topic.

 

[1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128


> scan.bounded.latest-offset makes queries never finish if the latest message 
> is a EndTxn Kafka marker
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35419
>                 URL: https://issues.apache.org/jira/browse/FLINK-35419
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0, 1.16.0, 1.17.0, 1.19.0
>            Reporter: Fabian Paul
>            Priority: Major
>
> When running the kafka connector in bounded mode, the stop condition can be 
> defined as the latest offset when the job starts. Unfortunately, Kafka's 
> latest offset calculation also includes special marker records, such as 
> transaction markers, in the overall count.
>  
> When Flink waits for a job to finish, it compares the number of records read 
> until the point with the original latest offset [1]. Since the consumer will 
> never see the special marker records, the latest offset is never reached, and 
> the job gets stuck. 
>  
> To reproduce the issue, you can write into a Kafka topic and make sure that 
> the latest record is a transaction end event. Afterwards you can start a 
> Flink job configured with `scan.bounded.latest-offset` pointing to that topic.
>  
> [1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to