[
https://issues.apache.org/jira/browse/CAMEL-21689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922519#comment-17922519
]
Claus Ibsen commented on CAMEL-21689:
-------------------------------------
Ah okay so you want this to either batch when there is either first
* 50.000 records
* 10 sec elapsed then batch whatever you have so far
So this is a bit misleading with the name of the option pollTimeout as a
timeout is when there are no records for X time, and as you say there are some
records flowing in, so the timeout is not triggered.
What it sounds like is you want a new option batchInterval=10000 that triggers
every X interval. This is for example how the Aggregate EIP is designed.
> Apache Camel Kafka Batching Consumer behaviour discrepancy w.r.t pollTimeoutMs
> ------------------------------------------------------------------------------
>
> Key: CAMEL-21689
> URL: https://issues.apache.org/jira/browse/CAMEL-21689
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 4.8.3
> Reporter: Manjunath S Horapeti
> Priority: Major
> Attachments: image-2025-01-31-01-18-05-681.png,
> image-2025-01-31-01-25-52-332.png, image-2025-01-31-01-26-53-858.png
>
>
> Hi Team,
>
> I'm reaching out regarding the understanding of behaviour of "pollTimeoutMs"
> and the discrepancy observed.
> !image-2025-01-31-01-18-05-681.png|width=627,height=156!
> As per the documentation above from the camel kafka component page
> (https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer)
> 4.8.x version, the pollTimeoutMs works in tandem with "maxPollRecords", to
> either poll "maxPollRecords" or block for a maximum of "pollTimeOutMs".
> But the behaviour observed was that the camel route kept waiting until
> "maxPollRecords" count was reached and then processed further.
>
> For example: our route is as follows
>
> {code:java}
> from("kafka:topic-name?brokers=brokers" +
> "&groupId=groupid&pollTimeoutMs=10000" +
> "&batching=true&maxPollRecords=50000")
> .bean(this, "methodName");{code}
>
> This route always waits until 50000 records are present in the topic and then
> proceeds further and ignoring pollTimeoutMs of 10000 (10 seconds). i.e. if
> the producer is producing messages at a rate of 50-100 msgs per second, then
> application waits for nearly 500-1000 seconds before proceeding further that
> is until 50000 record count is met.
>
> We believe that the below code mentioned in the
> "KafkaRecordBatchingProcessor" is never executed as there is always one or
> two messages in poll. and hence method - "hasExpiredRecords" with condition
> consumerRecords.isEmpty() is always false.
> We believe this is making the application wait until maxPollRecords (50000)
> is reached and then proceed further.
> !image-2025-01-31-01-25-52-332.png|width=640,height=257!
> Definition of "hasExpiredRecords()"
> !image-2025-01-31-01-26-53-858.png|width=653,height=53!
> Can you please help us by letting us know if the above behaviour is as
> expected and if so then can you let us know how to pull Y messages from the
> topic or write whatever messages are received within X seconds? (exact
> behaviour mentioned in the above Batching Consumer documentation).
>
> Thanks in advance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)