[
https://issues.apache.org/jira/browse/CAMEL-16973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Otavio Rodolfo Piske updated CAMEL-16973:
-----------------------------------------
Description:
The fetch record class calls Kafka's Client wakeup method [1]. By calling this
method, it causes blocking code to throw the WakeUpException if they are
blocked or on the next call that would block. Ideally, this exception should be
handled cleanly, as it indicates a preemptive reaction to a call that would
block [2] and not the usual case of an error condition.
There are some circumstances that may cause the code to skip commits or lose
messages. For example:
1. If the code is using Manual commit and the shutdown is called while the
exchange is being processed [3], this would cause the WakeUpException to be
handled as normal exception [4].
2. It could also happen in similar circumstances the commit section of the code
later on.
As such, if the stop or shutdown procedure is started while the code is in
1.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L478-L488]
2.
[https://kafka.apache.org/21/javadoc/?org/apache/kafka/common/errors/WakeupException.html]
3.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L275-L279]
4.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L350]
was:
The fetch record class calls Kafka's Client wakeup method [1]. By calling this
method, it causes blocking code to throw the WakeUpException if they are
blocked or on the next call that would block. Ideally, this exception should be
handled cleanly, as it indicates a preemptive reaction to a call that would
block [2] and not the usual case of an error condition.
There are some circumstances that may cause the code to skip commits or lose
messages. For example:
1. If the code is using Manual commit and the shutdown is called while the
exchange is being processed [3], this would cause the WakeUpException to be
handled as normal exception [4].
2. It could also happen in similar circumstances the commit section of the code
later on.
As such, if the stop or shutdown procedure is started while the code is in
1.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L478-L488
]
2.
[https://kafka.apache.org/21/javadoc/?org/apache/kafka/common/errors/WakeupException.html]
3.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L275-L279]
4.
[https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L350]
> camel-kafka: commits may be skipped on stop or shutdown
> -------------------------------------------------------
>
> Key: CAMEL-16973
> URL: https://issues.apache.org/jira/browse/CAMEL-16973
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Reporter: Otavio Rodolfo Piske
> Assignee: Otavio Rodolfo Piske
> Priority: Major
>
> The fetch record class calls Kafka's Client wakeup method [1]. By calling
> this method, it causes blocking code to throw the WakeUpException if they are
> blocked or on the next call that would block. Ideally, this exception should
> be handled cleanly, as it indicates a preemptive reaction to a call that
> would block [2] and not the usual case of an error condition.
>
> There are some circumstances that may cause the code to skip commits or lose
> messages. For example:
> 1. If the code is using Manual commit and the shutdown is called while the
> exchange is being processed [3], this would cause the WakeUpException to be
> handled as normal exception [4].
> 2. It could also happen in similar circumstances the commit section of the
> code later on.
>
> As such, if the stop or shutdown procedure is started while the code is in
>
> 1.
> [https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L478-L488]
> 2.
> [https://kafka.apache.org/21/javadoc/?org/apache/kafka/common/errors/WakeupException.html]
> 3.
> [https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L275-L279]
>
> 4.
> [https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L350]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)