dongwoo6kim opened a new pull request, #100:
URL: https://github.com/apache/flink-connector-kafka/pull/100

   ## Problem 
   When using the flink kafka connector in batch scenarios, consuming 
transactional messages can cause indefinite hanging.
   This issue can be easily reproduced with following steps. 
   1. Produce transactional messages and commit them.
   2. Configure `scan.bounded.mode` to `latest-offset` and run consumer using 
flink kafka connector
   
   ## Cause
   The previous stopping condition in the `KafkaPartitionSplitReader` compared 
the offset of the last record with the `stoppingOffset`. This approach works 
for streaming use cases and batch processing of non-transactional messages. 
However, in scenarios involving transactional messages, this is insufficient.   
   
   [Control messages](https://kafka.apache.org/documentation/#controlbatch), 
which are not visible to clients, can occupy the entire range between the last 
record's offset and the stoppingOffset which leads to indefinite blocking.
   
   ## Workaround
   I've modified the stopping condition to use `consumer.position(tp)`, which 
effectively skips any control messages present in the current poll, pointing 
directly to the next record's offset. 
   To handle edge cases, particularly when `properties.max.poll.records` is set 
to `1`, I've adjusted the fetch method to always check all assigned partitions, 
even if no records are returned in a poll.
   
   #### Edge case example 
   Consider partition `0`, where offsets `13` and `14` are valid records and 
`15` is a control record. If `stoppingOffset` is set to 15 for partition `0`and 
`properties.max.poll.records` is configured to `1`, checking only partitions 
that return records would miss offset 15. By consistently reviewing all 
assigned partitions, the consumer’s position jumps control record in the 
subsequent poll, allowing the system to escape.
   
   ## Discussion
   To address the metric issue in 
[FLINK-33484](https://issues.apache.org/jira/browse/FLINK-33484), I think we 
need to make wrapper class of  `ConsumerRecord` for example 
`ConsumerRecordWithOffsetJump`.
   ```java
   public ConsumerRecordWithOffsetJump(ConsumerRecord<K, V> record, long 
offsetJump) {
           this.record = record;
           this.offsetJump = offsetJump;
       }
   ```
   And we may need new `KafkaPartitionSplitReader` that implements 
`SplitReader<ConsumerRecordWithOffsetJump<byte[], byte[]>, 
KafkaPartitionSplit>`.     
   So when record is emitted it should set current offset not just 
`record.offset()+1` but`record.offset() + record.jumpValue` in 
[here](https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L54).
  
     `jumpValue` is typically 1, except for the last record of each poll where 
it's calculated as `consumer.position() - lastRecord.offset()`.    
   If this sounds good to everyone, I'm happy to work on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to