This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c0def69 (chores) camel-kafka: code cleanups (#6074)
c0def69 is described below
commit c0def695648fe93623d393950f8c06f8c526630f
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Sep 10 17:11:01 2021 +0200
(chores) camel-kafka: code cleanups (#6074)
- replaced reference to deprecated configuration constant
- cleanup generating Kafka properties from endpoint configuration
- replaced reference to deprecated configuration constant
- removed usage of deprecated hasOut/getOut methods
- do not assert a primitive for null
---
.../camel/component/kafka/KafkaConfiguration.java | 7 +++--
.../camel/component/kafka/KafkaConsumer.java | 35 ++++++++++------------
.../camel/component/kafka/KafkaFetchRecords.java | 7 +++--
.../camel/component/kafka/KafkaProducer.java | 18 ++---------
.../camel/component/kafka/KafkaProducerTest.java | 33 ++++++++++----------
5 files changed, 47 insertions(+), 53 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 74430fc..c70f7cb 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -50,6 +50,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
@UriParams
public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware {
@@ -404,7 +405,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
getKerberosBeforeReloginMinTime());
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
getKerberosRenewWindowFactor());
- addListPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES,
getKerberosPrincipalToLocalRules());
+ addListPropertyIfNotNull(props,
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+ getKerberosPrincipalToLocalRules());
addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM,
getSaslMechanism());
addPropertyIfNotNull(props, SaslConfigs.SASL_JAAS_CONFIG,
getSaslJaasConfig());
@@ -471,7 +473,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
getKerberosBeforeReloginMinTime());
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
addPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
getKerberosRenewWindowFactor());
- addListPropertyIfNotNull(props,
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES,
getKerberosPrincipalToLocalRules());
+ addListPropertyIfNotNull(props,
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+ getKerberosPrincipalToLocalRules());
addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM,
getSaslMechanism());
addPropertyIfNotNull(props, SaslConfigs.SASL_JAAS_CONFIG,
getSaslJaasConfig());
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ddfebe2..f80ac69 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -29,6 +29,7 @@ import
org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,29 +65,25 @@ public class KafkaConsumer extends DefaultConsumer {
return (KafkaEndpoint) super.getEndpoint();
}
+ private String randomUUID() {
+ return UUID.randomUUID().toString();
+ }
+
Properties getProps() {
- Properties props =
endpoint.getConfiguration().createConsumerProperties();
+ KafkaConfiguration configuration = endpoint.getConfiguration();
+
+ Properties props = configuration.createConsumerProperties();
endpoint.updateClassProperties(props);
- String brokers =
endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
- if (brokers != null) {
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
- }
+
ObjectHelper.ifNotEmpty(endpoint.getKafkaClientFactory().getBrokers(configuration),
+ v -> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, v));
+
+ String groupId =
ObjectHelper.supplyIfEmpty(configuration.getGroupId(), this::randomUUID);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+ ObjectHelper.ifNotEmpty(configuration.getGroupInstanceId(),
+ v -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, v));
- if (endpoint.getConfiguration().getGroupId() != null) {
- String groupId = endpoint.getConfiguration().getGroupId();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- LOG.debug("Kafka consumer groupId is {}", groupId);
- } else {
- String randomGroupId = UUID.randomUUID().toString();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, randomGroupId);
- LOG.debug("Kafka consumer groupId is {} (generated)",
randomGroupId);
- }
- if (endpoint.getConfiguration().getGroupInstanceId() != null) {
- String gid = endpoint.getConfiguration().getGroupInstanceId();
- LOG.debug("Kafka consumer groupInstanceId is {}", gid);
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gid);
- }
return props;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index dedd0b8..ee6921c 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -108,8 +108,7 @@ class KafkaFetchRecords implements Runnable,
ConsumerRebalanceListener {
first = false;
- if (!kafkaConsumer.isRunAllowed() ||
kafkaConsumer.isStoppingOrStopped()
- || kafkaConsumer.isSuspendingOrSuspended()) {
+ if (isCloseable()) {
LOG.debug("Closing consumer {}", threadId);
IOHelper.close(consumer);
return;
@@ -122,6 +121,10 @@ class KafkaFetchRecords implements Runnable,
ConsumerRebalanceListener {
LOG.info("Terminating KafkaConsumer thread: {} receiving from topic:
{}", threadId, topicName);
}
+ private boolean isCloseable() {
+ return !kafkaConsumer.isRunAllowed() ||
kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended();
+ }
+
void preInit() {
doInit();
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 43d6541..8d908c6 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -307,11 +307,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
List<RecordMetadata> recordMetadatas = new ArrayList<>();
if (endpoint.getConfiguration().isRecordMetadata()) {
- if (exchange.hasOut()) {
- exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
- } else {
- exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
- }
+ exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
}
while (c.hasNext()) {
@@ -331,11 +327,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
innerExchange = (Exchange) f.getKey();
if (innerExchange != null) {
if (endpoint.getConfiguration().isRecordMetadata()) {
- if (innerExchange.hasOut()) {
-
innerExchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
- } else {
-
innerExchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
- }
+
innerExchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
}
}
}
@@ -432,11 +424,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
if (endpoint.getConfiguration().isRecordMetadata()) {
if (body instanceof Exchange) {
Exchange ex = (Exchange) body;
- if (ex.hasOut()) {
- ex.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
- } else {
- ex.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
- }
+ ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA,
recordMetadatas);
}
if (body instanceof Message) {
Message msg = (Message) body;
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 59fcb84..aac2425 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -69,7 +69,6 @@ public class KafkaProducerTest {
private Exchange exchange = Mockito.mock(Exchange.class);
private ExtendedCamelContext camelContext =
Mockito.mock(ExtendedCamelContext.class);
private Message in = new DefaultMessage(camelContext);
- private Message out = new DefaultMessage(camelContext);
private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
@SuppressWarnings({ "unchecked" })
@@ -118,7 +117,7 @@ public class KafkaProducerTest {
public void processSendsMessage() throws Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
@@ -145,7 +144,7 @@ public class KafkaProducerTest {
public void processAsyncSendsMessage() throws Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
@@ -162,7 +161,7 @@ public class KafkaProducerTest {
public void processAsyncSendsMessageWithException() throws Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
// setup the exception here
org.apache.kafka.clients.producer.KafkaProducer kp =
producer.getKafkaProducer();
@@ -186,7 +185,7 @@ public class KafkaProducerTest {
endpoint.getConfiguration().setTopic(null);
Mockito.when(exchange.getIn()).thenReturn(in);
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
producer.process(exchange);
@@ -198,7 +197,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithTopicHeaderAndEndPoint() throws
Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -217,7 +216,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithOverrideTopicHeaderAndEndPoint() throws
Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -238,6 +237,7 @@ public class KafkaProducerTest {
public void processRequiresTopicInEndpointOrInHeader() throws Exception {
endpoint.getConfiguration().setTopic(null);
Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.KEY, "someKey");
@@ -251,6 +251,7 @@ public class KafkaProducerTest {
public void processRequiresTopicInConfiguration() throws Exception {
endpoint.getConfiguration().setTopic("configTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.KEY, "someKey");
@@ -264,7 +265,7 @@ public class KafkaProducerTest {
public void processDoesNotRequirePartitionHeader() throws Exception {
endpoint.getConfiguration().setTopic("sometopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
producer.process(exchange);
@@ -275,7 +276,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithPartitionKeyHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.KEY, "someKey");
@@ -289,7 +290,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithPartitionKeyHeaderOnly() throws
Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
producer.process(exchange);
@@ -302,7 +303,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithMessageKeyHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.KEY, "someKey");
producer.process(exchange);
@@ -315,7 +316,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithMessageTimestampHeader() throws
Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.KEY, "someKey");
in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
@@ -330,7 +331,7 @@ public class KafkaProducerTest {
public void processSendMessageWithTopicHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
in.setHeader(KafkaConstants.KEY, "someKey");
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
@@ -345,7 +346,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithMessageTopicName() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
- Mockito.when(exchange.getMessage()).thenReturn(out);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
producer.process(exchange);
@@ -357,6 +358,7 @@ public class KafkaProducerTest {
public void
processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange()
throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
// we set the initial topic
in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -384,6 +386,7 @@ public class KafkaProducerTest {
public void
processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange()
throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
// we set the initial topic
in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -453,7 +456,7 @@ public class KafkaProducerTest {
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)
in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
assertNotNull(recordMetaData1);
assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
- assertNotNull(recordMetaData1.get(0).timestamp());
+ assertNotNull(recordMetaData1.get(0));
}
private void assertRecordMetadataExists() {