[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-34470: ----------------------------------- Affects Version/s: kafka-3.1.0 (was: 1.17.1) > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: kafka-3.1.0 > Reporter: dongwoo.kim > Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)