[
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848510#comment-17848510
]
Muhammet Orazov commented on FLINK-34470:
-----------------------------------------
Hey [~dongwoo.kim] , sorry for late reply. Somehow missed the message.
I have added a minor comment, but overall I'd add integration test for this
case if possible and let committer to check the PR also
> Transactional message + Table api kafka source with 'latest-offset' scan
> bound mode causes indefinitely hanging
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: kafka-3.1.0
> Reporter: dongwoo.kim
> Priority: Major
> Labels: pull-request-available
>
> h2. Summary
> Hi we have faced issue with transactional message and table api kafka source.
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's
> request timeouts after hanging. We can always reproduce this unexpected
> behavior by following below steps.
> This is related to this
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of
> the transaction. And these control messages are not polled by
> {*}consumer.poll(){*}. (It is filtered internally). In
> *KafkaPartitionSplitReader* code, split is finished only when
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work
> well with non transactional messages or streaming environment but in some
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp,
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
> can solve the problem.
> *consumer.position(tp)* gets next record's offset if it exist and the last
> record's offset if the next record doesn't exist.
> By this KafkaPartitionSplitReader is available to finish the split even when
> the stopping offset is configured to control record's offset.
> I would be happy to implement about this fix if we can reach on agreement.
> Thanks
--
This message was sent by Atlassian Jira
(v8.20.10#820010)