This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f3220a13cd61 CAMEL-23023: camel-kafka - Kafka consumers are started 
eager before C… (#21943)
f3220a13cd61 is described below

commit f3220a13cd61a9e8aa028e103052a522a6e5d632
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 12 10:37:09 2026 +0100

    CAMEL-23023: camel-kafka - Kafka consumers are started eager before C… 
(#21943)
    
    * CAMEL-23023: camel-kafka - Kafka consumers are started eager before 
CamelContext is fully started
---
 .../camel/component/kafka/KafkaComponent.java      | 39 +++++++++++++++++++++-
 .../camel/component/kafka/KafkaConsumer.java       |  8 ++++-
 .../ROOT/pages/camel-4x-upgrade-guide-4_19.adoc    |  3 ++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 347514dcd247..9a8413d368f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,9 +17,12 @@
 package org.apache.camel.component.kafka;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedStartupListener;
 import org.apache.camel.SSLContextParametersAware;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
@@ -33,9 +36,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Component("kafka")
-public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware {
+public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware, ExtendedStartupListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaComponent.class);
 
+    private final List<Runnable> pendingConsumers = new 
CopyOnWriteArrayList<>();
+
     @Metadata
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @Metadata(label = "security", defaultValue = "false")
@@ -104,6 +110,10 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         return endpoint;
     }
 
+    void pendingConsumer(Runnable task) {
+        pendingConsumers.add(task);
+    }
+
     public KafkaConfiguration getConfiguration() {
         return configuration;
     }
@@ -244,6 +254,26 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         this.subscribeConsumerTopicMustExists = 
subscribeConsumerTopicMustExists;
     }
 
+    @Override
+    public void onCamelContextStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        if (alreadyStarted) {
+            startPendingConsumers();
+        }
+    }
+
+    @Override
+    public void onCamelContextFullyStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        startPendingConsumers();
+    }
+
+    private void startPendingConsumers() {
+        if (!pendingConsumers.isEmpty()) {
+            LOG.info("Starting {} pending Kafka consumers as CamelContext is 
fully started", pendingConsumers.size());
+            pendingConsumers.forEach(Runnable::run);
+            pendingConsumers.clear();
+        }
+    }
+
     @Override
     protected void doInit() throws Exception {
         super.doInit();
@@ -266,4 +296,11 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         PropertyBindingSupport.bindProperties(getCamelContext(), map, 
configuration.getAdditionalProperties());
         configuration.setAdditionalProperties(map);
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        pendingConsumers.clear();
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52cff9dc06b8..98f18d942c60 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -170,7 +170,13 @@ public class KafkaConsumer extends DefaultConsumer
             KafkaFetchRecords task = new KafkaFetchRecords(
                     this, bridge, topic, pattern, Integer.toString(i), 
getProps(), consumerListener);
 
-            executor.submit(task);
+            if (!endpoint.getCamelContext().isStarted()) {
+                // if camel has not been fully started yet then delay starting 
this consumer to avoid
+                // process incoming message before camel is fully started
+                endpoint.getComponent().pendingConsumer(() -> 
executor.submit(task));
+            } else {
+                executor.submit(task);
+            }
 
             tasks.add(task);
         }
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
index 9dce4c3d960f..4b4a6346bf66 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
@@ -286,6 +286,9 @@ The Kafka client library has been upgraded from 3.9.1 to 
4.2.0. This is a major
 
 * If you had explicitly configured the `partitioner` option to use 
`org.apache.kafka.clients.producer.internals.DefaultPartitioner` or 
`org.apache.kafka.clients.producer.UniformStickyPartitioner`, you must remove 
that configuration as these classes have been removed in Kafka 4.0. The 
built-in default partitioner (used when no partitioner is set) continues to 
work.
 
+Camel will now defer starting Kafka consumers during startup of `CamelContext` 
to after the context is fully started, to ensure
+that any incoming message from Kafka brokers are only received by Camel when 
everything is fully started.
+
 === camel-google-pubsub-lite
 
 The `camel-google-pubsub-lite` component has been removed. The component was 
deprecated in Camel 4.10 following Google Cloud Platform's deprecation of the 
underlying Pub/Sub Lite service.

Reply via email to