This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new ba032458d0 Fix Kafka component Quarkus dev services discovery in dev & 
test mode
ba032458d0 is described below

commit ba032458d0f5f17c4b8f09aebfc72058aa6f6752
Author: James Netherton <[email protected]>
AuthorDate: Mon Jul 21 13:16:32 2025 +0100

    Fix Kafka component Quarkus dev services discovery in dev & test mode
    
    Fixes #7517
---
 .../component/kafka/deployment/KafkaProcessor.java | 22 ++--------
 .../deployment/KafkaDevServicesEnabledTest.java    |  2 -
 .../component/kafka/KafkaComponentObserver.java    | 48 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 20 deletions(-)

diff --git 
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
 
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
index 4ddf9345d1..030a2fced1 100644
--- 
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
+++ 
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
@@ -17,7 +17,6 @@
 package org.apache.camel.quarkus.component.kafka.deployment;
 
 import java.util.Collection;
-import java.util.Optional;
 import java.util.stream.Stream;
 
 import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
@@ -27,23 +26,18 @@ import io.quarkus.deployment.IsNormal;
 import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
-import 
io.quarkus.deployment.builditem.DevServicesLauncherConfigResultBuildItem;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
 import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
 import io.quarkus.deployment.dev.devservices.DevServicesConfig;
 import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
 import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer;
-import org.eclipse.microprofile.config.Config;
-import org.eclipse.microprofile.config.ConfigProvider;
+import org.apache.camel.quarkus.component.kafka.KafkaComponentObserver;
 import org.jboss.jandex.ClassInfo;
 import org.jboss.jandex.DotName;
 import org.jboss.jandex.IndexView;
 
 class KafkaProcessor {
     private static final String FEATURE = "camel-kafka";
-    private static final String CAMEL_KAFKA_BROKERS = 
"camel.component.kafka.brokers";
-    private static final String KAFKA_BOOTSTRAP_SERVERS = 
"kafka.bootstrap.servers";
     private static final DotName[] KAFKA_CLIENTS_TYPES = {
             DotName.createSimple("org.apache.kafka.clients.producer.Producer"),
             DotName.createSimple("org.apache.kafka.clients.consumer.Consumer")
@@ -65,18 +59,10 @@ class KafkaProcessor {
 
     @BuildStep(onlyIfNot = IsNormal.class, onlyIf = 
DevServicesConfig.Enabled.class)
     public void configureKafkaComponentForDevServices(
-            DevServicesLauncherConfigResultBuildItem devServiceResult,
             KafkaBuildTimeConfig kafkaBuildTimeConfig,
-            BuildProducer<RunTimeConfigurationDefaultBuildItem> runTimeConfig) 
{
-
-        Config config = ConfigProvider.getConfig();
-        Optional<String> brokers = 
config.getOptionalValue(CAMEL_KAFKA_BROKERS, String.class);
-
-        if (brokers.isEmpty() && 
kafkaBuildTimeConfig.devservices().enabled().orElse(true)) {
-            String kafkaBootstrapServers = 
devServiceResult.getConfig().get(KAFKA_BOOTSTRAP_SERVERS);
-            if (kafkaBootstrapServers != null) {
-                runTimeConfig.produce(new 
RunTimeConfigurationDefaultBuildItem(CAMEL_KAFKA_BROKERS, 
kafkaBootstrapServers));
-            }
+            BuildProducer<AdditionalBeanBuildItem> additionalBean) {
+        if (kafkaBuildTimeConfig.devservices().enabled().orElse(true)) {
+            
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(KafkaComponentObserver.class));
         }
     }
 
diff --git 
a/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
 
b/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
index 712eef083e..979e5a0ea6 100644
--- 
a/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
+++ 
b/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
@@ -33,7 +33,6 @@ import org.jboss.shrinkwrap.api.ShrinkWrap;
 import org.jboss.shrinkwrap.api.asset.Asset;
 import org.jboss.shrinkwrap.api.asset.StringAsset;
 import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -59,7 +58,6 @@ public class KafkaDevServicesEnabledTest {
     @ConfigProperty(name = "kafka.bootstrap.servers")
     String bootstrapServers;
 
-    @Disabled("https://github.com/apache/camel-quarkus/issues/7517";)
     @Test
     public void camelKafkaComponentBrokersConfigurationIsDevServicesBroker() {
         KafkaComponent component = context.getComponent("kafka", 
KafkaComponent.class);
diff --git 
a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
 
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
new file mode 100644
index 0000000000..afd92c4184
--- /dev/null
+++ 
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.util.Optional;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.quarkus.core.events.ComponentAddEvent;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+@ApplicationScoped
+public class KafkaComponentObserver {
+    private static final String CAMEL_KAFKA_BROKERS = 
"camel.component.kafka.brokers";
+    private static final String KAFKA_BOOTSTRAP_SERVERS = 
"kafka.bootstrap.servers";
+
+    void initKafkaBrokerConfiguration(@Observes ComponentAddEvent event) {
+        if (event.getComponent() instanceof KafkaComponent kafkaComponent) {
+            Config config = ConfigProvider.getConfig();
+            KafkaConfiguration configuration = 
kafkaComponent.getConfiguration();
+
+            Optional<String> camelKafkaBrokers = 
config.getOptionalValue(CAMEL_KAFKA_BROKERS, String.class);
+            Optional<String> kafkaBootstrapServers = 
config.getOptionalValue(KAFKA_BOOTSTRAP_SERVERS, String.class);
+            if (camelKafkaBrokers.isEmpty() && 
ObjectHelper.isEmpty(configuration.getBrokers())
+                    && kafkaBootstrapServers.isPresent()) {
+                configuration.setBrokers(kafkaBootstrapServers.get());
+            }
+        }
+    }
+}

Reply via email to