This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new c61a902359c [CAMEL-18527] Camel Kafka Component: batch producer with
individual headers (#8408)
c61a902359c is described below
commit c61a902359c25735b694d8444857fa405ef7aae0
Author: R. Wiedmann <[email protected]>
AuthorDate: Thu Sep 22 12:19:11 2022 +0200
[CAMEL-18527] Camel Kafka Component: batch producer with individual headers
(#8408)
CAMEL-18527: batch producer with individual headers
Co-authored-by: Ralf Wiedmann <[email protected]>
---
.../component/kafka/KafkaComponentConfigurer.java | 6 ++
.../component/kafka/KafkaEndpointConfigurer.java | 6 ++
.../component/kafka/KafkaEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/kafka/kafka.json | 2 +
.../camel/component/kafka/KafkaConfiguration.java | 17 ++++
.../camel/component/kafka/KafkaProducer.java | 9 +-
.../producer/support/KeyValueHolderIterator.java | 13 ++-
.../support/PropagatedHeadersProvider.java | 69 +++++++++++++++
.../camel/component/kafka/KafkaProducerTest.java | 97 ++++++++++++++++++++--
9 files changed, 203 insertions(+), 19 deletions(-)
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 3be52693578..17a8e1fa68c 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -40,6 +40,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "autoOffsetReset":
getOrCreateConfiguration(target).setAutoOffsetReset(property(camelContext,
java.lang.String.class, value)); return true;
case "autowiredenabled":
case "autowiredEnabled":
target.setAutowiredEnabled(property(camelContext, boolean.class, value));
return true;
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders":
getOrCreateConfiguration(target).setBatchWithIndividualHeaders(property(camelContext,
boolean.class, value)); return true;
case "breakonfirsterror":
case "breakOnFirstError":
getOrCreateConfiguration(target).setBreakOnFirstError(property(camelContext,
boolean.class, value)); return true;
case "bridgeerrorhandler":
@@ -260,6 +262,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "autoOffsetReset": return java.lang.String.class;
case "autowiredenabled":
case "autowiredEnabled": return boolean.class;
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders": return boolean.class;
case "breakonfirsterror":
case "breakOnFirstError": return boolean.class;
case "bridgeerrorhandler":
@@ -476,6 +480,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "autoOffsetReset": return
getOrCreateConfiguration(target).getAutoOffsetReset();
case "autowiredenabled":
case "autowiredEnabled": return target.isAutowiredEnabled();
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders": return
getOrCreateConfiguration(target).isBatchWithIndividualHeaders();
case "breakonfirsterror":
case "breakOnFirstError": return
getOrCreateConfiguration(target).isBreakOnFirstError();
case "bridgeerrorhandler":
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index bba6d9d9c94..981619ad325 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -31,6 +31,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "autoCommitIntervalMs":
target.getConfiguration().setAutoCommitIntervalMs(property(camelContext,
java.lang.Integer.class, value)); return true;
case "autooffsetreset":
case "autoOffsetReset":
target.getConfiguration().setAutoOffsetReset(property(camelContext,
java.lang.String.class, value)); return true;
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders":
target.getConfiguration().setBatchWithIndividualHeaders(property(camelContext,
boolean.class, value)); return true;
case "breakonfirsterror":
case "breakOnFirstError":
target.getConfiguration().setBreakOnFirstError(property(camelContext,
boolean.class, value)); return true;
case "bridgeerrorhandler":
@@ -235,6 +237,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "autoCommitIntervalMs": return java.lang.Integer.class;
case "autooffsetreset":
case "autoOffsetReset": return java.lang.String.class;
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders": return boolean.class;
case "breakonfirsterror":
case "breakOnFirstError": return boolean.class;
case "bridgeerrorhandler":
@@ -440,6 +444,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "autoCommitIntervalMs": return
target.getConfiguration().getAutoCommitIntervalMs();
case "autooffsetreset":
case "autoOffsetReset": return
target.getConfiguration().getAutoOffsetReset();
+ case "batchwithindividualheaders":
+ case "batchWithIndividualHeaders": return
target.getConfiguration().isBatchWithIndividualHeaders();
case "breakonfirsterror":
case "breakOnFirstError": return
target.getConfiguration().isBreakOnFirstError();
case "bridgeerrorhandler":
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 8e6b17ec99a..6a70d0dc1da 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,12 +21,13 @@ public class KafkaEndpointUriFactory extends
org.apache.camel.support.component.
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(102);
+ Set<String> props = new HashSet<>(103);
props.add("additionalProperties");
props.add("allowManualCommit");
props.add("autoCommitEnable");
props.add("autoCommitIntervalMs");
props.add("autoOffsetReset");
+ props.add("batchWithIndividualHeaders");
props.add("breakOnFirstError");
props.add("bridgeErrorHandler");
props.add("brokers");
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 804546c4a9b..9fcbbcb3d68 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -66,6 +66,7 @@
"pollExceptionStrategy": { "kind": "property", "displayName": "Poll
Exception Strategy", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false,
"autowired": true, "secret": false, "description": "To use a custom strategy
with the consumer to control how to handle exceptions thrown from the Kafka
broker while pooling messages." },
"subscribeConsumerBackoffInterval": { "kind": "property", "displayName":
"Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000,
"description": "The delay in millis seconds to wait before trying again to
subscribe to the kafka broker." },
"subscribeConsumerBackoffMaxAttempts": { "kind": "property",
"displayName": "Subscribe Consumer Backoff Max Attempts", "group": "consumer
(advanced)", "label": "consumer,advanced", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "description": "Maximum number the kafka consumer will attempt
to subscribe to the kafka broker, before eventually giving up and failing.
Error during subscribing the consumer to the kafka top [...]
+ "batchWithIndividualHeaders": { "kind": "property", "displayName": "Batch
With Individual Headers", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "If this feature is
enabled and a single element of a batch is an Exchang [...]
"bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "33554432", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The total bytes of memory the producer can use
to buffer records waiting to be [...]
"compressionCodec": { "kind": "property", "displayName": "Compression
Codec", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy",
"lz4" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "none", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This parameter allows you to specify the [...]
"connectionMaxIdleMs": { "kind": "property", "displayName": "Connection
Max Idle Ms", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "540000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Close idle connections
after the number of milliseconds specified by this [...]
@@ -187,6 +188,7 @@
"exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the exchange pattern when the consumer creates an
exchange." },
"isolationLevel": { "kind": "parameter", "displayName": "Isolation Level",
"group": "consumer (advanced)", "label": "consumer,advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "enum": [
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "read_uncommitted",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Contro [...]
"kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka
Manual Commit Factory", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory",
"deprecated": false, "autowired": false, "secret": false, "description":
"Factory to use for creating KafkaManualCommit instances. This allows to plugin
a custom factory to create custom KafkaManualCommit insta [...]
+ "batchWithIndividualHeaders": { "kind": "parameter", "displayName": "Batch
With Individual Headers", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "If this feature is
enabled and a single element of a batch is an Exchan [...]
"bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "33554432", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The total bytes of memory the producer can use
to buffer records waiting to b [...]
"compressionCodec": { "kind": "parameter", "displayName": "Compression
Codec", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy",
"lz4" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "none", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This parameter allows you to specify th [...]
"connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection
Max Idle Ms", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "540000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Close idle connections
after the number of milliseconds specified by thi [...]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
old mode 100644
new mode 100755
index 7554e28d0bf..caf7b859087
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -184,6 +184,9 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
// retries
@UriParam(label = "producer", defaultValue = "0")
private Integer retries = 0;
+ // use individual headers if exchange.body contains Iterable or similar of
Message or Exchange
+ @UriParam(label = "producer", defaultValue = "false")
+ private boolean batchWithIndividualHeaders;
// batch.size
@UriParam(label = "producer", defaultValue = "16384")
private Integer producerBatchSize = 16384;
@@ -1344,6 +1347,20 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
this.producerBatchSize = producerBatchSize;
}
+ public boolean isBatchWithIndividualHeaders() {
+ return batchWithIndividualHeaders;
+ }
+
+ /**
+ * If this feature is enabled and a single element of a batch is an
Exchange or Message, the producer will generate
+ * individual kafka header values for it by using the batch Message to
determine the values. Normal behaviour
+ * consists in always using the same header values (which are determined
by the parent Exchange which contains the
+ * Iterable or Iterator).
+ */
+ public void setBatchWithIndividualHeaders(boolean
batchWithIndividualHeaders) {
+ this.batchWithIndividualHeaders = batchWithIndividualHeaders;
+ }
+
public Integer getConnectionMaxIdleMs() {
return connectionMaxIdleMs;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
old mode 100644
new mode 100755
index bcdf40e406e..acc37e506ba
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -35,6 +35,7 @@ import
org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import
org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
import
org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
import org.apache.camel.component.kafka.producer.support.ProducerUtil;
+import
org.apache.camel.component.kafka.producer.support.PropagatedHeadersProvider;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.spi.HeaderFilterStrategy;
@@ -236,14 +237,14 @@ public class KafkaProducer extends DefaultAsyncProducer {
Exchange exchange, Message message) {
String topic = evaluateTopic(message);
- // extracting headers which need to be propagated
- List<Header> propagatedHeaders = getPropagatedHeaders(exchange,
message);
+ PropagatedHeadersProvider propagatedHeadersProvider
+ = new PropagatedHeadersProvider(this, configuration, exchange,
message);
Object body = message.getBody();
Iterator<Object> iterator = getObjectIterator(body);
- return new KeyValueHolderIterator(iterator, exchange, configuration,
topic, propagatedHeaders);
+ return new KeyValueHolderIterator(iterator, exchange, configuration,
topic, propagatedHeadersProvider);
}
protected ProducerRecord<Object, Object> createRecord(Exchange exchange,
Message message) {
@@ -329,7 +330,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
return iterator;
}
- private List<Header> getPropagatedHeaders(Exchange exchange, Message
message) {
+ public List<Header> getPropagatedHeaders(Exchange exchange, Message
message) {
Map<String, Object> messageHeaders = message.getHeaders();
List<Header> propagatedHeaders = new
ArrayList<>(messageHeaders.size());
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
old mode 100644
new mode 100755
index a207a97d6db..ac82d09711e
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.kafka.producer.support;
import java.util.Iterator;
-import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -25,7 +24,6 @@ import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.KeyValueHolder;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.Header;
import static
org.apache.camel.component.kafka.producer.support.ProducerUtil.tryConvertToSerializedType;
@@ -34,15 +32,15 @@ public class KeyValueHolderIterator implements
Iterator<KeyValueHolder<Object, P
private final Exchange exchange;
private final KafkaConfiguration kafkaConfiguration;
private final String msgTopic;
- private final List<Header> propagatedHeaders;
+ private final PropagatedHeadersProvider propagatedHeadersProvider;
public KeyValueHolderIterator(Iterator<Object> msgList, Exchange exchange,
KafkaConfiguration kafkaConfiguration,
- String msgTopic, List<Header>
propagatedHeaders) {
+ String msgTopic, PropagatedHeadersProvider
propagatedHeadersProvider) {
this.msgList = msgList;
this.exchange = exchange;
this.kafkaConfiguration = kafkaConfiguration;
this.msgTopic = msgTopic;
- this.propagatedHeaders = propagatedHeaders;
+ this.propagatedHeadersProvider = propagatedHeadersProvider;
}
@Override
@@ -73,13 +71,14 @@ public class KeyValueHolderIterator implements
Iterator<KeyValueHolder<Object, P
return new KeyValueHolder<>(
body,
new ProducerRecord<>(
- innerTopic, innerPartitionKey, innerTimestamp,
innerKey, value, propagatedHeaders));
+ innerTopic, innerPartitionKey, innerTimestamp,
innerKey, value,
+ propagatedHeadersProvider.getHeaders(ex,
innerMessage)));
}
return new KeyValueHolder<>(
body,
new ProducerRecord<>(
- msgTopic, null, null, null, body, propagatedHeaders));
+ msgTopic, null, null, null, body,
propagatedHeadersProvider.getDefaultHeaders()));
}
private Message getInnerMessage(Object body) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
new file mode 100755
index 00000000000..5398729b4de
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.producer.support;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaProducer;
+import org.apache.kafka.common.header.Header;
+
+/**
+ * Used to provide individual kafka header values if feature
"batchWithIndividualHeaders" is enabled.
+ */
+public class PropagatedHeadersProvider {
+
+ private final KafkaProducer kafkaProducer;
+ private final Exchange parentExchange;
+ private final Message parentMessage;
+
+ // only set if batchWithIndividualHeaders is disabled (which is the
default behaviour)
+ private final List<Header> propagatedHeaders;
+
+ public PropagatedHeadersProvider(KafkaProducer kafkaProducer,
KafkaConfiguration configuration, Exchange parentExchange,
+ Message parentMessage) {
+ this.kafkaProducer = kafkaProducer;
+ this.parentExchange = parentExchange;
+ this.parentMessage = parentMessage;
+
+ // extracting headers which need to be propagated: instant eval for
common headers, lazy eval for individual headers
+ propagatedHeaders = configuration.isBatchWithIndividualHeaders() ?
null : getDefaultHeaders();
+ }
+
+ /**
+ * Returns header values which are determined by parent exchange.
+ */
+ public final List<Header> getDefaultHeaders() {
+ return kafkaProducer.getPropagatedHeaders(parentExchange,
parentMessage);
+ }
+
+ /**
+ * Create kafka header values by given Message.
+ */
+ public List<Header> getHeaders(Exchange childExchange, Message
childMessage) {
+ if (propagatedHeaders != null) {
+ // default behaviour: use headers determined by parent Exchange
+ return propagatedHeaders;
+ } else {
+ // parentExchange and childExchange may be identical, but
parentMessage and childMessage are not.
+ return kafkaProducer.getPropagatedHeaders(childExchange,
childMessage);
+ }
+ }
+
+}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
old mode 100644
new mode 100755
index 86cc411317a..b8b36616b6b
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.kafka;
+import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
@@ -60,6 +61,8 @@ import static org.mockito.ArgumentMatchers.isA;
public class KafkaProducerTest {
+ private static final String SOME_INDIVIDUAL_HEADER =
"someIndividualHeader";
+
private KafkaProducer producer;
private KafkaEndpoint endpoint;
private KafkaEndpoint fromEndpoint;
@@ -380,7 +383,7 @@ public class KafkaProducerTest {
producer.process(exchange);
// assert results
- verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"));
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"), null);
assertRecordMetadataExists(3);
assertRecordMetadataExistsForEachAggregatedExchange();
}
@@ -408,7 +411,69 @@ public class KafkaProducerTest {
producer.process(exchange);
// assert results
- verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"));
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"),
+ Arrays.asList("", "", ""));
+ assertRecordMetadataExists(3);
+ assertRecordMetadataExistsForEachAggregatedMessage();
+ }
+
+ @Test
+ public void processSendsMessageWithListOfExchangesWithIndividualHeaders()
throws Exception {
+ endpoint.getConfiguration().setBatchWithIndividualHeaders(true);
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
+
+ // we set the initial topic
+ in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+ in.setHeader(KafkaConstants.KEY, "someKey");
+ in.setHeader(SOME_INDIVIDUAL_HEADER, "default");
+
+ // we add our exchanges in order to aggregate
+ final List<Exchange> nestedExchanges
+ =
createListOfExchangesWithTopics(Arrays.asList("overridenTopic1",
"overridenTopic2", "overridenTopic3"));
+
+ // aggregate
+ final Exchange finalAggregatedExchange =
aggregateExchanges(nestedExchanges, new GroupedExchangeAggregationStrategy());
+
+ in.setBody(finalAggregatedExchange.getIn().getBody());
+ in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+ producer.process(exchange);
+
+ // assert results
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"),
+ Arrays.asList("value-1", "value-2", "value-3"));
+ assertRecordMetadataExists(3);
+ assertRecordMetadataExistsForEachAggregatedExchange();
+ }
+
+ @Test
+ public void processSendsMessageWithListOfMessagesWithIndividualHeaders()
throws Exception {
+ endpoint.getConfiguration().setBatchWithIndividualHeaders(true);
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(in);
+
+ // we set the initial topic
+ in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+ in.setHeader(KafkaConstants.KEY, "someKey");
+
+ // we add our exchanges in order to aggregate
+ final List<Exchange> nestedExchanges
+ =
createListOfExchangesWithTopics(Arrays.asList("overridenTopic1",
"overridenTopic2", "overridenTopic3"));
+
+ // aggregate messages
+ final Exchange finalAggregatedExchange =
aggregateExchanges(nestedExchanges, new GroupedMessageAggregationStrategy());
+
+ in.setBody(finalAggregatedExchange.getIn().getBody());
+ in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+ producer.process(exchange);
+
+ // assert results
+ verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2",
"overridenTopic3"),
+ Arrays.asList("value-1", "value-2", "value-3"));
assertRecordMetadataExists(3);
assertRecordMetadataExistsForEachAggregatedMessage();
}
@@ -446,13 +511,26 @@ public class KafkaProducerTest {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- protected void verifySendMessages(final List<String> expectedTopics) {
+ protected void verifySendMessages(final List<String> expectedTopics, final
List<String> expectedIndividualHeaderValues) {
final ArgumentCaptor<ProducerRecord> captor =
ArgumentCaptor.forClass(ProducerRecord.class);
Mockito.verify(producer.getKafkaProducer(),
Mockito.atLeast(expectedTopics.size())).send(captor.capture());
+ final List<ProducerRecord> records = captor.getAllValues();
final List<String> actualTopics
- =
captor.getAllValues().stream().map(ProducerRecord::topic).collect(Collectors.toList());
+ =
records.stream().map(ProducerRecord::topic).collect(Collectors.toList());
assertEquals(expectedTopics, actualTopics);
+
+ if (expectedIndividualHeaderValues == null) {
+ return;
+ }
+
+ final List<String> actualIndividualHeaderValues = records.stream()
+ .map(ProducerRecord::headers)
+ .map(headers -> headers.lastHeader(SOME_INDIVIDUAL_HEADER))
+ .map(header -> header == null ? "" : new
String(header.value(), StandardCharsets.UTF_8))
+ .collect(Collectors.toList());
+
+ assertEquals(expectedIndividualHeaderValues,
actualIndividualHeaderValues);
}
private void assertRecordMetadataTimestampExists() {
@@ -512,11 +590,16 @@ public class KafkaProducerTest {
private List<Exchange> createListOfExchangesWithTopics(final List<String>
topics) {
final List<Exchange> resultLists = new LinkedList<>();
- topics.forEach(topic -> {
+ int index = 1;
+ for (String topic : topics) {
final Exchange innerExchange = new DefaultExchange(camelContext);
- innerExchange.getIn().setHeader(KafkaConstants.OVERRIDE_TOPIC,
topic);
+ innerExchange.setExchangeId("exchange-" + index);
+ final Message msg = innerExchange.getIn();
+ msg.setMessageId("message-" + index);
+ msg.setHeader(KafkaConstants.OVERRIDE_TOPIC, topic);
+ msg.setHeader(SOME_INDIVIDUAL_HEADER, "value-" + index++);
resultLists.add(innerExchange);
- });
+ }
return resultLists;
}