This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new ba032458d0 Fix Kafka component Quarkus dev services discovery in dev &
test mode
ba032458d0 is described below
commit ba032458d0f5f17c4b8f09aebfc72058aa6f6752
Author: James Netherton <[email protected]>
AuthorDate: Mon Jul 21 13:16:32 2025 +0100
Fix Kafka component Quarkus dev services discovery in dev & test mode
Fixes #7517
---
.../component/kafka/deployment/KafkaProcessor.java | 22 ++--------
.../deployment/KafkaDevServicesEnabledTest.java | 2 -
.../component/kafka/KafkaComponentObserver.java | 48 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 20 deletions(-)
diff --git
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
index 4ddf9345d1..030a2fced1 100644
---
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
+++
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
@@ -17,7 +17,6 @@
package org.apache.camel.quarkus.component.kafka.deployment;
import java.util.Collection;
-import java.util.Optional;
import java.util.stream.Stream;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
@@ -27,23 +26,18 @@ import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
-import
io.quarkus.deployment.builditem.DevServicesLauncherConfigResultBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.dev.devservices.DevServicesConfig;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer;
-import org.eclipse.microprofile.config.Config;
-import org.eclipse.microprofile.config.ConfigProvider;
+import org.apache.camel.quarkus.component.kafka.KafkaComponentObserver;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
class KafkaProcessor {
private static final String FEATURE = "camel-kafka";
- private static final String CAMEL_KAFKA_BROKERS =
"camel.component.kafka.brokers";
- private static final String KAFKA_BOOTSTRAP_SERVERS =
"kafka.bootstrap.servers";
private static final DotName[] KAFKA_CLIENTS_TYPES = {
DotName.createSimple("org.apache.kafka.clients.producer.Producer"),
DotName.createSimple("org.apache.kafka.clients.consumer.Consumer")
@@ -65,18 +59,10 @@ class KafkaProcessor {
@BuildStep(onlyIfNot = IsNormal.class, onlyIf =
DevServicesConfig.Enabled.class)
public void configureKafkaComponentForDevServices(
- DevServicesLauncherConfigResultBuildItem devServiceResult,
KafkaBuildTimeConfig kafkaBuildTimeConfig,
- BuildProducer<RunTimeConfigurationDefaultBuildItem> runTimeConfig)
{
-
- Config config = ConfigProvider.getConfig();
- Optional<String> brokers =
config.getOptionalValue(CAMEL_KAFKA_BROKERS, String.class);
-
- if (brokers.isEmpty() &&
kafkaBuildTimeConfig.devservices().enabled().orElse(true)) {
- String kafkaBootstrapServers =
devServiceResult.getConfig().get(KAFKA_BOOTSTRAP_SERVERS);
- if (kafkaBootstrapServers != null) {
- runTimeConfig.produce(new
RunTimeConfigurationDefaultBuildItem(CAMEL_KAFKA_BROKERS,
kafkaBootstrapServers));
- }
+ BuildProducer<AdditionalBeanBuildItem> additionalBean) {
+ if (kafkaBuildTimeConfig.devservices().enabled().orElse(true)) {
+
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(KafkaComponentObserver.class));
}
}
diff --git
a/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
b/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
index 712eef083e..979e5a0ea6 100644
---
a/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
+++
b/extensions/kafka/deployment/src/test/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaDevServicesEnabledTest.java
@@ -33,7 +33,6 @@ import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.Asset;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -59,7 +58,6 @@ public class KafkaDevServicesEnabledTest {
@ConfigProperty(name = "kafka.bootstrap.servers")
String bootstrapServers;
- @Disabled("https://github.com/apache/camel-quarkus/issues/7517")
@Test
public void camelKafkaComponentBrokersConfigurationIsDevServicesBroker() {
KafkaComponent component = context.getComponent("kafka",
KafkaComponent.class);
diff --git
a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
new file mode 100644
index 0000000000..afd92c4184
--- /dev/null
+++
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/KafkaComponentObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.util.Optional;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.quarkus.core.events.ComponentAddEvent;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+@ApplicationScoped
+public class KafkaComponentObserver {
+ private static final String CAMEL_KAFKA_BROKERS =
"camel.component.kafka.brokers";
+ private static final String KAFKA_BOOTSTRAP_SERVERS =
"kafka.bootstrap.servers";
+
+ void initKafkaBrokerConfiguration(@Observes ComponentAddEvent event) {
+ if (event.getComponent() instanceof KafkaComponent kafkaComponent) {
+ Config config = ConfigProvider.getConfig();
+ KafkaConfiguration configuration =
kafkaComponent.getConfiguration();
+
+ Optional<String> camelKafkaBrokers =
config.getOptionalValue(CAMEL_KAFKA_BROKERS, String.class);
+ Optional<String> kafkaBootstrapServers =
config.getOptionalValue(KAFKA_BOOTSTRAP_SERVERS, String.class);
+ if (camelKafkaBrokers.isEmpty() &&
ObjectHelper.isEmpty(configuration.getBrokers())
+ && kafkaBootstrapServers.isPresent()) {
+ configuration.setBrokers(kafkaBootstrapServers.get());
+ }
+ }
+ }
+}