This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3b82829 Camel-AWS2-MSK: Fixed CS
3b82829 is described below
commit 3b828296964e1c3bff66981ad38481edceab6b18
Author: Andrea Cosentino <[email protected]>
AuthorDate: Thu Mar 12 11:03:04 2020 +0100
Camel-AWS2-MSK: Fixed CS
---
.../camel/component/aws2/msk/MSK2Component.java | 26 +++++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git
a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Component.java
b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Component.java
index ca92a4f..8a8dbba 100644
---
a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Component.java
+++
b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Component.java
@@ -24,6 +24,10 @@ import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import software.amazon.awssdk.services.kafka.KafkaClient;
/**
@@ -31,7 +35,9 @@ import software.amazon.awssdk.services.kafka.KafkaClient;
*/
@Component("aws2-msk")
public class MSK2Component extends DefaultComponent {
-
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MSK2Component.class);
+
@Metadata
private MSK2Configuration configuration = new MSK2Configuration();
@@ -50,7 +56,7 @@ public class MSK2Component extends DefaultComponent {
MSK2Configuration configuration = this.configuration != null ?
this.configuration.copy() : new MSK2Configuration();
MSK2Endpoint endpoint = new MSK2Endpoint(uri, this, configuration);
setProperties(endpoint, parameters);
- checkAndSetRegistryClient(configuration);
+ checkAndSetRegistryClient(configuration, endpoint);
if (configuration.getMskClient() == null &&
(configuration.getAccessKey() == null || configuration.getSecretKey() == null))
{
throw new IllegalArgumentException("Amazon msk client or accessKey
and secretKey must be specified");
}
@@ -68,10 +74,18 @@ public class MSK2Component extends DefaultComponent {
this.configuration = configuration;
}
- private void checkAndSetRegistryClient(MSK2Configuration configuration) {
- Set<KafkaClient> clients =
getCamelContext().getRegistry().findByType(KafkaClient.class);
- if (clients.size() == 1) {
- configuration.setMskClient(clients.stream().findFirst().get());
+ private void checkAndSetRegistryClient(MSK2Configuration configuration,
MSK2Endpoint endpoint) {
+ if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMskClient())) {
+ LOG.debug("Looking for an KafkaClient instance in the registry");
+ Set<KafkaClient> clients =
getCamelContext().getRegistry().findByType(KafkaClient.class);
+ if (clients.size() == 1) {
+ LOG.debug("Found exactly one KafkaClient instance in the
registry");
+ configuration.setMskClient(clients.stream().findFirst().get());
+ } else {
+ LOG.debug("No KafkaClient instance in the registry");
+ }
+ } else {
+ LOG.debug("KafkaClient instance is already set at endpoint level:
skipping the check in the registry");
}
}
}