[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34470:
-----------------------------------
    Affects Version/s: kafka-3.1.0
                           (was: 1.17.1)

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34470
>                 URL: https://issues.apache.org/jira/browse/FLINK-34470
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.1.0
>            Reporter: dongwoo.kim
>            Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
>                     recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
>                     finishSplitAtRecord(
>                             tp,
>                             stoppingOffset,
>                             lastRecord.offset(),
>                             finishedPartitions,
>                             recordsBySplits);
>                 }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to