This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b320626ed34 [CAMEL-18527] Camel Kafka Component: batch producer with 
individual headers (#8408)
b320626ed34 is described below

commit b320626ed340409a7c483a71b294810588bf3b9a
Author: R. Wiedmann <[email protected]>
AuthorDate: Thu Sep 22 12:19:11 2022 +0200

    [CAMEL-18527] Camel Kafka Component: batch producer with individual headers 
(#8408)
    
    CAMEL-18527: batch producer with individual headers
    Co-authored-by: Ralf Wiedmann <[email protected]>
---
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++
 .../component/kafka/KafkaEndpointConfigurer.java   |  6 ++
 .../component/kafka/KafkaEndpointUriFactory.java   |  3 +-
 .../org/apache/camel/component/kafka/kafka.json    |  2 +
 .../camel/component/kafka/KafkaConfiguration.java  | 17 ++++
 .../camel/component/kafka/KafkaProducer.java       |  9 +-
 .../producer/support/KeyValueHolderIterator.java   | 13 ++-
 .../support/PropagatedHeadersProvider.java         | 69 +++++++++++++++
 .../camel/component/kafka/KafkaProducerTest.java   | 97 ++++++++++++++++++++--
 9 files changed, 203 insertions(+), 19 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 3be52693578..17a8e1fa68c 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -40,6 +40,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoOffsetReset": 
getOrCreateConfiguration(target).setAutoOffsetReset(property(camelContext, 
java.lang.String.class, value)); return true;
         case "autowiredenabled":
         case "autowiredEnabled": 
target.setAutowiredEnabled(property(camelContext, boolean.class, value)); 
return true;
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": 
getOrCreateConfiguration(target).setBatchWithIndividualHeaders(property(camelContext,
 boolean.class, value)); return true;
         case "breakonfirsterror":
         case "breakOnFirstError": 
getOrCreateConfiguration(target).setBreakOnFirstError(property(camelContext, 
boolean.class, value)); return true;
         case "bridgeerrorhandler":
@@ -260,6 +262,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoOffsetReset": return java.lang.String.class;
         case "autowiredenabled":
         case "autowiredEnabled": return boolean.class;
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": return boolean.class;
         case "breakonfirsterror":
         case "breakOnFirstError": return boolean.class;
         case "bridgeerrorhandler":
@@ -476,6 +480,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoOffsetReset": return 
getOrCreateConfiguration(target).getAutoOffsetReset();
         case "autowiredenabled":
         case "autowiredEnabled": return target.isAutowiredEnabled();
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": return 
getOrCreateConfiguration(target).isBatchWithIndividualHeaders();
         case "breakonfirsterror":
         case "breakOnFirstError": return 
getOrCreateConfiguration(target).isBreakOnFirstError();
         case "bridgeerrorhandler":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index bba6d9d9c94..981619ad325 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -31,6 +31,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitIntervalMs": 
target.getConfiguration().setAutoCommitIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "autooffsetreset":
         case "autoOffsetReset": 
target.getConfiguration().setAutoOffsetReset(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": 
target.getConfiguration().setBatchWithIndividualHeaders(property(camelContext, 
boolean.class, value)); return true;
         case "breakonfirsterror":
         case "breakOnFirstError": 
target.getConfiguration().setBreakOnFirstError(property(camelContext, 
boolean.class, value)); return true;
         case "bridgeerrorhandler":
@@ -235,6 +237,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitIntervalMs": return java.lang.Integer.class;
         case "autooffsetreset":
         case "autoOffsetReset": return java.lang.String.class;
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": return boolean.class;
         case "breakonfirsterror":
         case "breakOnFirstError": return boolean.class;
         case "bridgeerrorhandler":
@@ -440,6 +444,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitIntervalMs": return 
target.getConfiguration().getAutoCommitIntervalMs();
         case "autooffsetreset":
         case "autoOffsetReset": return 
target.getConfiguration().getAutoOffsetReset();
+        case "batchwithindividualheaders":
+        case "batchWithIndividualHeaders": return 
target.getConfiguration().isBatchWithIndividualHeaders();
         case "breakonfirsterror":
         case "breakOnFirstError": return 
target.getConfiguration().isBreakOnFirstError();
         case "bridgeerrorhandler":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 8e6b17ec99a..6a70d0dc1da 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,12 +21,13 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(102);
+        Set<String> props = new HashSet<>(103);
         props.add("additionalProperties");
         props.add("allowManualCommit");
         props.add("autoCommitEnable");
         props.add("autoCommitIntervalMs");
         props.add("autoOffsetReset");
+        props.add("batchWithIndividualHeaders");
         props.add("breakOnFirstError");
         props.add("bridgeErrorHandler");
         props.add("brokers");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 42505e38301..6df00d04047 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -66,6 +66,7 @@
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll 
Exception Strategy", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, 
"autowired": true, "secret": false, "description": "To use a custom strategy 
with the consumer to control how to handle exceptions thrown from the Kafka 
broker while pooling messages." },
     "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": 
"Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "The delay in millis seconds to wait before trying again to 
subscribe to the kafka broker." },
     "subscribeConsumerBackoffMaxAttempts": { "kind": "property", 
"displayName": "Subscribe Consumer Backoff Max Attempts", "group": "consumer 
(advanced)", "label": "consumer,advanced", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "description": "Maximum number the kafka consumer will attempt 
to subscribe to the kafka broker, before eventually giving up and failing. 
Error during subscribing the consumer to the kafka top [...]
+    "batchWithIndividualHeaders": { "kind": "property", "displayName": "Batch 
With Individual Headers", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "If this feature is 
enabled and a single element of a batch is an Exchang [...]
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify the [...]
     "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by this [...]
@@ -187,6 +188,7 @@
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
     "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "enum": [ 
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "read_uncommitted", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Contro [...]
     "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit insta [...]
+    "batchWithIndividualHeaders": { "kind": "parameter", "displayName": "Batch 
With Individual Headers", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "If this feature is 
enabled and a single element of a batch is an Exchan [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify th [...]
     "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by thi [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
old mode 100644
new mode 100755
index 5216febafa4..467d3e12480
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -184,6 +184,9 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     // retries
     @UriParam(label = "producer", defaultValue = "0")
     private Integer retries = 0;
+    // use individual headers if exchange.body contains Iterable or similar of 
Message or Exchange
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean batchWithIndividualHeaders;
     // batch.size
     @UriParam(label = "producer", defaultValue = "16384")
     private Integer producerBatchSize = 16384;
@@ -1352,6 +1355,20 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.producerBatchSize = producerBatchSize;
     }
 
+    public boolean isBatchWithIndividualHeaders() {
+        return batchWithIndividualHeaders;
+    }
+
+    /**
+     * If this feature is enabled and a single element of a batch is an 
Exchange or Message, the producer will generate
+     * individual kafka header values for it by using the batch Message to 
determine the values. Normal behaviour
+     * consists in always using the same header values (which are determined 
by the parent Exchange which contains the
+     * Iterable or Iterator).
+     */
+    public void setBatchWithIndividualHeaders(boolean 
batchWithIndividualHeaders) {
+        this.batchWithIndividualHeaders = batchWithIndividualHeaders;
+    }
+
     public Integer getConnectionMaxIdleMs() {
         return connectionMaxIdleMs;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
old mode 100644
new mode 100755
index 958bc9ced55..686c422da57
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -35,6 +35,7 @@ import 
org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
 import 
org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
 import 
org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
 import org.apache.camel.component.kafka.producer.support.ProducerUtil;
+import 
org.apache.camel.component.kafka.producer.support.PropagatedHeadersProvider;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -236,14 +237,14 @@ public class KafkaProducer extends DefaultAsyncProducer {
             Exchange exchange, Message message) {
         String topic = evaluateTopic(message);
 
-        // extracting headers which need to be propagated
-        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, 
message);
+        PropagatedHeadersProvider propagatedHeadersProvider
+                = new PropagatedHeadersProvider(this, configuration, exchange, 
message);
 
         Object body = message.getBody();
 
         Iterator<Object> iterator = getObjectIterator(body);
 
-        return new KeyValueHolderIterator(iterator, exchange, configuration, 
topic, propagatedHeaders);
+        return new KeyValueHolderIterator(iterator, exchange, configuration, 
topic, propagatedHeadersProvider);
     }
 
     protected ProducerRecord<Object, Object> createRecord(Exchange exchange, 
Message message) {
@@ -330,7 +331,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return iterator;
     }
 
-    private List<Header> getPropagatedHeaders(Exchange exchange, Message 
message) {
+    public List<Header> getPropagatedHeaders(Exchange exchange, Message 
message) {
         Map<String, Object> messageHeaders = message.getHeaders();
         List<Header> propagatedHeaders = new 
ArrayList<>(messageHeaders.size());
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
old mode 100644
new mode 100755
index a207a97d6db..ac82d09711e
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.kafka.producer.support;
 
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -25,7 +24,6 @@ import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.Header;
 
 import static 
org.apache.camel.component.kafka.producer.support.ProducerUtil.tryConvertToSerializedType;
 
@@ -34,15 +32,15 @@ public class KeyValueHolderIterator implements 
Iterator<KeyValueHolder<Object, P
     private final Exchange exchange;
     private final KafkaConfiguration kafkaConfiguration;
     private final String msgTopic;
-    private final List<Header> propagatedHeaders;
+    private final PropagatedHeadersProvider propagatedHeadersProvider;
 
     public KeyValueHolderIterator(Iterator<Object> msgList, Exchange exchange, 
KafkaConfiguration kafkaConfiguration,
-                                  String msgTopic, List<Header> 
propagatedHeaders) {
+                                  String msgTopic, PropagatedHeadersProvider 
propagatedHeadersProvider) {
         this.msgList = msgList;
         this.exchange = exchange;
         this.kafkaConfiguration = kafkaConfiguration;
         this.msgTopic = msgTopic;
-        this.propagatedHeaders = propagatedHeaders;
+        this.propagatedHeadersProvider = propagatedHeadersProvider;
     }
 
     @Override
@@ -73,13 +71,14 @@ public class KeyValueHolderIterator implements 
Iterator<KeyValueHolder<Object, P
             return new KeyValueHolder<>(
                     body,
                     new ProducerRecord<>(
-                            innerTopic, innerPartitionKey, innerTimestamp, 
innerKey, value, propagatedHeaders));
+                            innerTopic, innerPartitionKey, innerTimestamp, 
innerKey, value,
+                            propagatedHeadersProvider.getHeaders(ex, 
innerMessage)));
         }
 
         return new KeyValueHolder<>(
                 body,
                 new ProducerRecord<>(
-                        msgTopic, null, null, null, body, propagatedHeaders));
+                        msgTopic, null, null, null, body, 
propagatedHeadersProvider.getDefaultHeaders()));
     }
 
     private Message getInnerMessage(Object body) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
new file mode 100755
index 00000000000..5398729b4de
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/PropagatedHeadersProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.producer.support;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaProducer;
+import org.apache.kafka.common.header.Header;
+
+/**
+ * Used to provide individual kafka header values if feature 
"batchWithIndividualHeaders" is enabled.
+ */
+public class PropagatedHeadersProvider {
+
+    private final KafkaProducer kafkaProducer;
+    private final Exchange parentExchange;
+    private final Message parentMessage;
+
+    // only set if batchWithIndividualHeaders is disabled (which is the 
default behaviour)
+    private final List<Header> propagatedHeaders;
+
+    public PropagatedHeadersProvider(KafkaProducer kafkaProducer, 
KafkaConfiguration configuration, Exchange parentExchange,
+                                     Message parentMessage) {
+        this.kafkaProducer = kafkaProducer;
+        this.parentExchange = parentExchange;
+        this.parentMessage = parentMessage;
+
+        // extracting headers which need to be propagated: instant eval for 
common headers, lazy eval for individual headers
+        propagatedHeaders = configuration.isBatchWithIndividualHeaders() ? 
null : getDefaultHeaders();
+    }
+
+    /**
+     * Returns header values which are determined by parent exchange.
+     */
+    public final List<Header> getDefaultHeaders() {
+        return kafkaProducer.getPropagatedHeaders(parentExchange, 
parentMessage);
+    }
+
+    /**
+     * Create kafka header values by given Message.
+     */
+    public List<Header> getHeaders(Exchange childExchange, Message 
childMessage) {
+        if (propagatedHeaders != null) {
+            // default behaviour: use headers determined by parent Exchange
+            return propagatedHeaders;
+        } else {
+            // parentExchange and childExchange may be identical, but 
parentMessage and childMessage are not.
+            return kafkaProducer.getPropagatedHeaders(childExchange, 
childMessage);
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
old mode 100644
new mode 100755
index 86cc411317a..b8b36616b6b
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -60,6 +61,8 @@ import static org.mockito.ArgumentMatchers.isA;
 
 public class KafkaProducerTest {
 
+    private static final String SOME_INDIVIDUAL_HEADER = 
"someIndividualHeader";
+
     private KafkaProducer producer;
     private KafkaEndpoint endpoint;
     private KafkaEndpoint fromEndpoint;
@@ -380,7 +383,7 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         // assert results
-        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"));
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"), null);
         assertRecordMetadataExists(3);
         assertRecordMetadataExistsForEachAggregatedExchange();
     }
@@ -408,7 +411,69 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         // assert results
-        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"));
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"),
+                Arrays.asList("", "", ""));
+        assertRecordMetadataExists(3);
+        assertRecordMetadataExistsForEachAggregatedMessage();
+    }
+
+    @Test
+    public void processSendsMessageWithListOfExchangesWithIndividualHeaders() 
throws Exception {
+        endpoint.getConfiguration().setBatchWithIndividualHeaders(true);
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
+
+        // we set the initial topic
+        in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+        in.setHeader(SOME_INDIVIDUAL_HEADER, "default");
+
+        // we add our exchanges in order to aggregate
+        final List<Exchange> nestedExchanges
+                = 
createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", 
"overridenTopic2", "overridenTopic3"));
+
+        // aggregate
+        final Exchange finalAggregatedExchange = 
aggregateExchanges(nestedExchanges, new GroupedExchangeAggregationStrategy());
+
+        in.setBody(finalAggregatedExchange.getIn().getBody());
+        in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+        producer.process(exchange);
+
+        // assert results
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"),
+                Arrays.asList("value-1", "value-2", "value-3"));
+        assertRecordMetadataExists(3);
+        assertRecordMetadataExistsForEachAggregatedExchange();
+    }
+
+    @Test
+    public void processSendsMessageWithListOfMessagesWithIndividualHeaders() 
throws Exception {
+        endpoint.getConfiguration().setBatchWithIndividualHeaders(true);
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
+
+        // we set the initial topic
+        in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        // we add our exchanges in order to aggregate
+        final List<Exchange> nestedExchanges
+                = 
createListOfExchangesWithTopics(Arrays.asList("overridenTopic1", 
"overridenTopic2", "overridenTopic3"));
+
+        // aggregate messages
+        final Exchange finalAggregatedExchange = 
aggregateExchanges(nestedExchanges, new GroupedMessageAggregationStrategy());
+
+        in.setBody(finalAggregatedExchange.getIn().getBody());
+        in.setHeaders(finalAggregatedExchange.getIn().getHeaders());
+
+        producer.process(exchange);
+
+        // assert results
+        verifySendMessages(Arrays.asList("overridenTopic1", "overridenTopic2", 
"overridenTopic3"),
+                Arrays.asList("value-1", "value-2", "value-3"));
         assertRecordMetadataExists(3);
         assertRecordMetadataExistsForEachAggregatedMessage();
     }
@@ -446,13 +511,26 @@ public class KafkaProducerTest {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected void verifySendMessages(final List<String> expectedTopics) {
+    protected void verifySendMessages(final List<String> expectedTopics, final 
List<String> expectedIndividualHeaderValues) {
         final ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
         Mockito.verify(producer.getKafkaProducer(), 
Mockito.atLeast(expectedTopics.size())).send(captor.capture());
+        final List<ProducerRecord> records = captor.getAllValues();
         final List<String> actualTopics
-                = 
captor.getAllValues().stream().map(ProducerRecord::topic).collect(Collectors.toList());
+                = 
records.stream().map(ProducerRecord::topic).collect(Collectors.toList());
 
         assertEquals(expectedTopics, actualTopics);
+
+        if (expectedIndividualHeaderValues == null) {
+            return;
+        }
+
+        final List<String> actualIndividualHeaderValues = records.stream()
+                .map(ProducerRecord::headers)
+                .map(headers -> headers.lastHeader(SOME_INDIVIDUAL_HEADER))
+                .map(header -> header == null ? "" : new 
String(header.value(), StandardCharsets.UTF_8))
+                .collect(Collectors.toList());
+
+        assertEquals(expectedIndividualHeaderValues, 
actualIndividualHeaderValues);
     }
 
     private void assertRecordMetadataTimestampExists() {
@@ -512,11 +590,16 @@ public class KafkaProducerTest {
     private List<Exchange> createListOfExchangesWithTopics(final List<String> 
topics) {
         final List<Exchange> resultLists = new LinkedList<>();
 
-        topics.forEach(topic -> {
+        int index = 1;
+        for (String topic : topics) {
             final Exchange innerExchange = new DefaultExchange(camelContext);
-            innerExchange.getIn().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
topic);
+            innerExchange.setExchangeId("exchange-" + index);
+            final Message msg = innerExchange.getIn();
+            msg.setMessageId("message-" + index);
+            msg.setHeader(KafkaConstants.OVERRIDE_TOPIC, topic);
+            msg.setHeader(SOME_INDIVIDUAL_HEADER, "value-" + index++);
             resultLists.add(innerExchange);
-        });
+        }
 
         return resultLists;
     }

Reply via email to