dongwoo6kim opened a new pull request, #100: URL: https://github.com/apache/flink-connector-kafka/pull/100
## Problem When using the flink kafka connector in batch scenarios, consuming transactional messages can cause indefinite hanging. This issue can be easily reproduced with following steps. 1. Produce transactional messages and commit them. 2. Configure `scan.bounded.mode` to `latest-offset` and run consumer using flink kafka connector ## Cause The previous stopping condition in the `KafkaPartitionSplitReader` compared the offset of the last record with the `stoppingOffset`. This approach works for streaming use cases and batch processing of non-transactional messages. However, in scenarios involving transactional messages, this is insufficient. [Control messages](https://kafka.apache.org/documentation/#controlbatch), which are not visible to clients, can occupy the entire range between the last record's offset and the stoppingOffset which leads to indefinite blocking. ## Workaround I've modified the stopping condition to use `consumer.position(tp)`, which effectively skips any control messages present in the current poll, pointing directly to the next record's offset. To handle edge cases, particularly when `properties.max.poll.records` is set to `1`, I've adjusted the fetch method to always check all assigned partitions, even if no records are returned in a poll. #### Edge case example Consider partition `0`, where offsets `13` and `14` are valid records and `15` is a control record. If `stoppingOffset` is set to 15 for partition `0`and `properties.max.poll.records` is configured to `1`, checking only partitions that return records would miss offset 15. By consistently reviewing all assigned partitions, the consumer’s position jumps control record in the subsequent poll, allowing the system to escape. ## Discussion To address the metric issue in [FLINK-33484](https://issues.apache.org/jira/browse/FLINK-33484), I think we need to make wrapper class of `ConsumerRecord` for example `ConsumerRecordWithOffsetJump`. ```java public ConsumerRecordWithOffsetJump(ConsumerRecord<K, V> record, long offsetJump) { this.record = record; this.offsetJump = offsetJump; } ``` And we may need new `KafkaPartitionSplitReader` that implements `SplitReader<ConsumerRecordWithOffsetJump<byte[], byte[]>, KafkaPartitionSplit>`. So when record is emitted it should set current offset not just `record.offset()+1` but`record.offset() + record.jumpValue` in [here](https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L54). `jumpValue` is typically 1, except for the last record of each poll where it's calculated as `consumer.position() - lastRecord.offset()`. If this sounds good to everyone, I'm happy to work on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
