[
https://issues.apache.org/jira/browse/CAMEL-21689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922616#comment-17922616
]
Manjunath S Horapeti commented on CAMEL-21689:
----------------------------------------------
Hi Claus,
Yes, as per the naming of the option pollTimeoutMs it gives a meaning that a
timeout would happen when no response is received, but from the documentation
mentioned here -
[https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer]
which states "To avoid blocking for too long, waiting for the whole set of
records to fill the batch, it is possible to use the {{pollTimeoutMs}} option
to set a timeout for the polling. In this case, the batch may contain less
messages than set in the {{{}maxPollRecords{}}}."
This gives a sense that we either wait till pollTimeoutMs seconds or until
maxPollRecords is reached.
If the behaviour is as expected can we have documentation changed so that no
one else in future also have a similar confusion?
Also while using the aggregation strategy, we are getting
"ConcurrentModificationException" when we try to commit the messages and hence
went with the above approach of letting camel handle the batching. By the looks
of it, Aggregation runs on different thread and the committing is supposed to
be done on the kafka consumer thread(as per documentation)
[https://camel.apache.org/components/4.8.x/kafka-component.html#_manual_commits_with_the_kafka_consumer].
Could you please suggest on how to achieve - either process at X interval or Y
records which ever is occurs first.
> Apache Camel Kafka Batching Consumer behaviour discrepancy w.r.t pollTimeoutMs
> ------------------------------------------------------------------------------
>
> Key: CAMEL-21689
> URL: https://issues.apache.org/jira/browse/CAMEL-21689
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 4.8.3
> Reporter: Manjunath S Horapeti
> Priority: Major
> Attachments: image-2025-01-31-01-18-05-681.png,
> image-2025-01-31-01-25-52-332.png, image-2025-01-31-01-26-53-858.png
>
>
> Hi Team,
>
> I'm reaching out regarding the understanding of behaviour of "pollTimeoutMs"
> and the discrepancy observed.
> !image-2025-01-31-01-18-05-681.png|width=627,height=156!
> As per the documentation above from the camel kafka component page
> (https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer)
> 4.8.x version, the pollTimeoutMs works in tandem with "maxPollRecords", to
> either poll "maxPollRecords" or block for a maximum of "pollTimeOutMs".
> But the behaviour observed was that the camel route kept waiting until
> "maxPollRecords" count was reached and then processed further.
>
> For example: our route is as follows
>
> {code:java}
> from("kafka:topic-name?brokers=brokers" +
> "&groupId=groupid&pollTimeoutMs=10000" +
> "&batching=true&maxPollRecords=50000")
> .bean(this, "methodName");{code}
>
> This route always waits until 50000 records are present in the topic and then
> proceeds further and ignoring pollTimeoutMs of 10000 (10 seconds). i.e. if
> the producer is producing messages at a rate of 50-100 msgs per second, then
> application waits for nearly 500-1000 seconds before proceeding further that
> is until 50000 record count is met.
>
> We believe that the below code mentioned in the
> "KafkaRecordBatchingProcessor" is never executed as there is always one or
> two messages in poll. and hence method - "hasExpiredRecords" with condition
> consumerRecords.isEmpty() is always false.
> We believe this is making the application wait until maxPollRecords (50000)
> is reached and then proceed further.
> !image-2025-01-31-01-25-52-332.png|width=640,height=257!
> Definition of "hasExpiredRecords()"
> !image-2025-01-31-01-26-53-858.png|width=653,height=53!
> Can you please help us by letting us know if the above behaviour is as
> expected and if so then can you let us know how to pull Y messages from the
> topic or write whatever messages are received within X seconds? (exact
> behaviour mentioned in the above Batching Consumer documentation).
>
> Thanks in advance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)