This is an automated email from the ASF dual-hosted git repository.

apupier pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 77554dfd5d0f Avoid potential NPE when the kafka consumer failed to 
start
77554dfd5d0f is described below

commit 77554dfd5d0f992b0c5528f3efd7903964e6b086
Author: AurĂ©lien Pupier <[email protected]>
AuthorDate: Thu Mar 5 14:17:51 2026 +0100

    Avoid potential NPE when the kafka consumer failed to start
    
    to avoid this kind of NPE:
    ```
    java.lang.NullPointerException: Cannot invoke
    "org.apache.kafka.clients.consumer.Consumer.wakeup()" because
    "this.consumer" is null
            at 
org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy.stop(SingleNodeKafkaResumeStrategy.java:412)
            at 
org.apache.camel.processor.resume.ResumableProcessor.doStop(ResumableProcessor.java:75)
            at 
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
            at 
org.apache.camel.impl.engine.DefaultChannel.doStop(DefaultChannel.java:138)
            at 
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:216)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:187)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
            at org.apache.camel.processor.Pipeline.doStop(Pipeline.java:209)
            at 
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
            at 
org.apache.camel.support.processor.DelegateAsyncProcessor.doStop(DelegateAsyncProcessor.java:107)
            at 
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
            at 
org.apache.camel.support.DefaultConsumer.doStop(DefaultConsumer.java:236)
            at 
org.apache.camel.component.kafka.KafkaConsumer.doStop(KafkaConsumer.java:225)
            at 
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
            at 
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
            at 
org.apache.camel.impl.engine.DefaultShutdownStrategy.shutdownNow(DefaultShutdownStrategy.java:434)
            at 
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:743)
            at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
            at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:328)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
            at java.base/java.lang.Thread.run(Thread.java:1474)
    ```
    
    Signed-off-by: AurĂ©lien Pupier <[email protected]>
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 2290a8c1b820..7d689761a028 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -409,7 +409,12 @@ public class SingleNodeKafkaResumeStrategy implements 
KafkaResumeStrategy, Camel
 
         try {
             LOG.info("Closing the Kafka consumer");
-            consumer.wakeup();
+            if (consumer != null) {
+                consumer.wakeup();
+            } else {
+                // This may happen if the start up has failed in some other 
part
+                LOG.trace("There's no Kafka consumer available to stop");
+            }
 
             if (executorService != null) {
                 executorService.shutdown();

Reply via email to