dongwoo.kim created FLINK-34470:
-----------------------------------
Summary: Transactional message + Table api kafka source with
'latest-offset' scan bound mode causes indefinitely hanging
Key: FLINK-34470
URL: https://issues.apache.org/jira/browse/FLINK-34470
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.17.1
Reporter: dongwoo.kim
h2. Summary
Hi we have faced issue with transactional message and table api kafka source.
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request
timeouts after hanging. We can always reproduce this unexpected behavior by
following below steps.
h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query
such as count(*)
3. Flink sql job gets stucked and timeouts.
h2. Cause
Transaction producer always produces [control
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the
transaction. And these controll messages are not polled by
{*}consumer.poll(){*}. (It is filtered internally). In
*KafkaPartitionSplitReader* code it finishes split only when
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work
well with non transactional messages or streaming environment but in some batch
use cases it causes unexpected hanging.
h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement.
By this KafkaPartitionSplitReader is available to finish the split even when
the stopping offset is configured to control record's offset.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)