This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 7547d0e Improve KafkaClientFactory integration
7547d0e is described below
commit 7547d0eecb929cda640ad73523cdb4b2e71340b4
Author: James Netherton <[email protected]>
AuthorDate: Mon Jun 7 13:38:40 2021 +0100
Improve KafkaClientFactory integration
Fixes #2486
---
.../quarkus/component/kafka/deployment/KafkaProcessor.java | 10 +++++-----
.../camel/quarkus/component/kafka/CamelKafkaRecorder.java | 10 +++-------
2 files changed, 8 insertions(+), 12 deletions(-)
diff --git
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
index 1143bc4..3bf8c9f 100644
---
a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
+++
b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
@@ -25,7 +25,7 @@ import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
-import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.quarkus.component.kafka.CamelKafkaRecorder;
import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem;
import
org.apache.kafka.common.security.scram.internals.ScramSaslClient.ScramSaslClientFactory;
@@ -40,14 +40,14 @@ class KafkaProcessor {
@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
- CamelRuntimeBeanBuildItem createCamelKafkaComponent(
+ CamelRuntimeBeanBuildItem createKafkaClientFactory(
CamelKafkaRecorder recorder,
// We want Quarkus to configure the ServiceBindingConverter bits
before this step
List<ServiceProviderBuildItem> serviceProviders) {
return new CamelRuntimeBeanBuildItem(
- "kafka",
- KafkaComponent.class.getName(),
- recorder.createKafkaComponent());
+ "quarkusKafkaClientFactory",
+ KafkaClientFactory.class.getName(),
+ recorder.createKafkaClientFactory());
}
@BuildStep
diff --git
a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java
index abc7bed..df50206 100644
---
a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java
+++
b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java
@@ -23,14 +23,13 @@ import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
-import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaClientFactory;
@Recorder
public class CamelKafkaRecorder {
@SuppressWarnings("unchecked")
- public RuntimeValue<KafkaComponent> createKafkaComponent() {
- final KafkaComponent component = new KafkaComponent();
+ public RuntimeValue<KafkaClientFactory> createKafkaClientFactory() {
final InstanceHandle<Object> instance =
Arc.container().instance("default-kafka-broker");
Map<String, Object> kafkaConfig;
@@ -40,10 +39,7 @@ public class CamelKafkaRecorder {
kafkaConfig = Collections.emptyMap();
}
- // TODO: Return new RuntimeValue<>(quarkusKafkaClientFactory) as the
KafkaClientFactory option should be autowired
- // https://issues.apache.org/jira/browse/CAMEL-16500
QuarkusKafkaClientFactory quarkusKafkaClientFactory = new
QuarkusKafkaClientFactory(kafkaConfig);
- component.setKafkaClientFactory(quarkusKafkaClientFactory);
- return new RuntimeValue<>(component);
+ return new RuntimeValue<>(quarkusKafkaClientFactory);
}
}