This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7b87a18fe0ac8df2aadfc9b17e3d090e83e50dae Author: geekr <[email protected]> AuthorDate: Sat Sep 10 12:56:34 2022 -0400 [CAMEL-18327] added method isKafkaConsumerRunnableAndNotStopped() to continue polling eve if the consumer is suspendend --- .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 266bf7a8f11..8cab5bcd76a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -307,7 +307,7 @@ public class KafkaFetchRecords implements Runnable { kafkaConsumer, threadId, commitManager, consumerListener); Duration pollDuration = Duration.ofMillis(pollTimeoutMs); - while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) { + while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { if (!consumerListener.afterConsume(consumer)) { @@ -420,6 +420,10 @@ public class KafkaFetchRecords implements Runnable { && !kafkaConsumer.isSuspendingOrSuspended(); } + private boolean isKafkaConsumerRunnableAndNotStopped() { + return kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped(); + } + private boolean isReconnect() { return reconnect; }
