This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 99fb9be CAMEL-12503 : support for propagating camel headers to kafka
and vice versa
99fb9be is described below
commit 99fb9be724c59e3c1b2bd77838a114a2d505e2cd
Author: Taras Danylchuk <[email protected]>
AuthorDate: Thu May 10 17:33:03 2018 +0300
CAMEL-12503 : support for propagating camel headers to kafka and vice versa
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 25 ++++-
.../camel/component/kafka/KafkaConfiguration.java | 36 +++++--
.../camel/component/kafka/KafkaConsumer.java | 25 ++++-
.../component/kafka/KafkaHeaderFilterStrategy.java | 35 +++++++
.../camel/component/kafka/KafkaProducer.java | 79 +++++++++++---
.../component/kafka/KafkaConsumerFullTest.java | 12 +++
.../component/kafka/KafkaProducerFullTest.java | 113 +++++++++++++++++++++
.../springboot/KafkaComponentConfiguration.java | 15 +++
8 files changed, 312 insertions(+), 28 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7d89604..13244b7 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (90 parameters):
+==== Query Parameters (91 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -80,6 +80,7 @@ with the following path and query parameters:
| Name | Description | Default | Type
| *brokers* (common) | URL of the Kafka brokers to use. The format is
host1:port1,host2:port2, and the list can be a subset of brokers or a VIP
pointing to a subset of brokers. This option is known as bootstrap.servers in
the Kafka documentation. | | String
| *clientId* (common) | The client id is a user-specified string sent in each
request to help trace calls. It should logically identify the application
making the request. | | String
+| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to
filter header to and from Camel message. | | HeaderFilterStrategy
| *reconnectBackoffMaxMs* (common) | The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has repeatedly failed
to connect. If provided, the backoff per host will increase exponentially for
each consecutive connection failure, up to this maximum. After calculating the
backoff increase, 20% random jitter is added to avoid connection storms. | 1000
| Integer
| *allowManualCommit* (consumer) | Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of
KafkaManualCommit is stored on the Exchange message header, which allows end
users to access this API and perform manual offset commits via the Kafka
consumer. | false | boolean
| *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper
the offset of messages already fetched by the consumer. This committed offset
will be used when the process fails as the position from which the new consumer
will begin. | true | Boolean
@@ -427,3 +428,25 @@ This will force a synchronous commit which will block
until the commit is acknow
If you want to use a custom implementation of `KafkaManualCommit` then you can
configure a custom `KafkaManualCommitFactory`
on the `KafkaComponent` that creates instances of your custom implementation.
+
+=== Kafka Headers propagation
+*Available as of Camel 2.22*
+
+When consuming messages from Kafka, headers will be propagated to camel
exchange headers automatically.
+Producing flow backed by same behaviour - camel headers of particular exchange
will be propagated to kafka message headers.
+
+Since kafka headers allows only `byte[]` values, in order camel exchnage
header to be propagated its value should be serialized to `bytes[]`,
+otherwise header will be skipped.
+Following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `byte[]`.
+Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value.
+
+By default all headers are being filtered by `KafkaHeaderFilterStrategy`.
+Strategy filters out headers which start with `Camel` or `org.apache.camel`
prefixes.
+Default strategy can be overridden by using `headerFilterStrategy` uri
parameter in both `to` and `from` routes:
+```
+from("kafka:my_topic?headerFilterStrategy=#myStrategy")
+...
+.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
+```
+
+`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be
placed in the Camel registry, either manually or by registration as a bean in
Spring/Blueprint, as it is `CamelContext` aware.
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 9b5ba6b..dabd475 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spi.UriParam;
@@ -43,15 +45,18 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
@UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware {
//Common configuration properties
- @UriPath(label = "common") @Metadata(required = "true")
+ @UriPath(label = "common")
+ @Metadata(required = "true")
private String topic;
@UriParam(label = "common")
private String brokers;
@UriParam(label = "common")
private String clientId;
+ @UriParam(label = "common", description = "To use a custom
HeaderFilterStrategy to filter header to and from Camel message.")
+ private HeaderFilterStrategy headerFilterStrategy = new
KafkaHeaderFilterStrategy();
@UriParam(label = "consumer")
private boolean topicIsPattern;
@@ -294,10 +299,10 @@ public class KafkaConfiguration implements Cloneable {
private Double kerberosRenewWindowFactor =
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
@UriParam(label = "common,security", defaultValue = "DEFAULT")
//sasl.kerberos.principal.to.local.rules
- private String kerberosPrincipalToLocalRules;
+ private String kerberosPrincipalToLocalRules;
@UriParam(label = "common,security", secret = true)
//sasl.jaas.config
- private String saslJaasConfig;
+ private String saslJaasConfig;
public KafkaConfiguration() {
}
@@ -343,7 +348,7 @@ public class KafkaConfiguration implements Cloneable {
addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
getRetryBackoffMs());
addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
isEnableIdempotence());
addPropertyIfNotNull(props,
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-
+
// SSL
applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
@@ -403,7 +408,7 @@ public class KafkaConfiguration implements Cloneable {
addPropertyIfNotNull(props,
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
getRetryBackoffMs());
addPropertyIfNotNull(props,
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-
+
// SSL
applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG,
getSslKeyPassword());
@@ -1029,14 +1034,14 @@ public class KafkaConfiguration implements Cloneable {
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
-
+
public String getSaslJaasConfig() {
return saslJaasConfig;
}
/**
* Expose the kafka sasl.jaas.config parameter
- *
+ *
* Example:
* org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
*/
@@ -1498,7 +1503,7 @@ public class KafkaConfiguration implements Cloneable {
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning
* end : read from end
- *
+ *
* This is replacing the earlier property seekToBeginning
*/
public void setSeekTo(String seekTo) {
@@ -1559,6 +1564,7 @@ public class KafkaConfiguration implements Cloneable {
public String getInterceptorClasses() {
return interceptorClasses;
}
+
/**
* Sets interceptors for producer or consumers.
* Producer interceptors have to be classes implementing {@link
org.apache.kafka.clients.producer.ProducerInterceptor}
@@ -1596,4 +1602,16 @@ public class KafkaConfiguration implements Cloneable {
public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from Camel
message.
+ */
+ public void setHeaderFilterStrategy(HeaderFilterStrategy
headerFilterStrategy) {
+ this.headerFilterStrategy = headerFilterStrategy;
+ }
+
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 05111f2..c585e05 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,10 +27,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -42,6 +44,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.header.Header;
public class KafkaConsumer extends DefaultConsumer {
@@ -98,7 +101,7 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
log.info("Starting Kafka consumer on topic: {} with breakOnFirstError:
{}",
- endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
+ endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
super.doStart();
executor = endpoint.createExecutor();
@@ -276,10 +279,12 @@ public class KafkaConsumer extends DefaultConsumer {
record = recordIterator.next();
if (log.isTraceEnabled()) {
log.trace("Partition = {}, offset = {},
key = {}, value = {}", record.partition(), record.offset(), record.key(),
- record.value());
+ record.value());
}
Exchange exchange =
endpoint.createKafkaExchange(record);
+ propagateHeaders(record, exchange,
endpoint.getConfiguration().getHeaderFilterStrategy());
+
// if not auto commit then we have additional
information on the exchange
if (!isAutoCommitEnabled()) {
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
!recordIterator.hasNext());
@@ -287,9 +292,9 @@ public class KafkaConsumer extends DefaultConsumer {
if
(endpoint.getConfiguration().isAllowManualCommit()) {
// allow Camel users to access the Kafka
consumer API to be able to do for example manual commits
KafkaManualCommit manual =
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange,
consumer, topicName, threadId,
- offsetRepository, partition,
record.offset());
+ offsetRepository, partition,
record.offset());
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-
+
}
try {
@@ -303,7 +308,7 @@ public class KafkaConsumer extends DefaultConsumer {
if
(endpoint.getConfiguration().isBreakOnFirstError()) {
// we are failing and we should break
out
log.warn("Error during processing {}
from topic: {}. Will seek consumer to offset: {} and re-connect and start
polling again.",
- exchange, topicName,
partitionLastOffset);
+ exchange, topicName,
partitionLastOffset);
// force commit so we resume on next
poll where we failed
commitOffset(offsetRepository,
partition, partitionLastOffset, true);
// continue to next partition
@@ -423,6 +428,16 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
+ private void propagateHeaders(ConsumerRecord<Object, Object> record,
Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+ StreamSupport.stream(record.headers().spliterator(), false)
+ .filter(header -> shouldBeFiltered(header, exchange,
headerFilterStrategy))
+ .forEach(header -> exchange.getIn().setHeader(header.key(),
header.value()));
+ }
+
+ private boolean shouldBeFiltered(Header header, Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ return !headerFilterStrategy.applyFilterToCamelHeaders(header.key(),
header.value(), exchange);
+ }
+
private boolean isAutoCommitEnabled() {
return endpoint.getConfiguration().isAutoCommitEnable() != null &&
endpoint.getConfiguration().isAutoCommitEnable();
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
new file mode 100644
index 0000000..3c55daa
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+ public KafkaHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ // filter out kafka record metadata
+ getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata");
+
+ // filter headers begin with "Camel" or "org.apache.camel"
+
setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+
setInFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+ }
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4ec8ef4..f4a7e1a 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -23,20 +23,26 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
public class KafkaProducer extends DefaultAsyncProducer {
@@ -142,8 +148,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message
header."
- + " Cannot send to same topic as the message comes
from: {}"
- + ". Will use endpoint configured topic: {}",
from, topic);
+ + " Cannot send to same topic as the message
comes from: {}"
+ + ". Will use endpoint configured topic: {}",
from, topic);
}
}
}
@@ -159,24 +165,28 @@ public class KafkaProducer extends DefaultAsyncProducer {
// endpoint take precedence over header configuration
final Integer partitionKey =
endpoint.getConfiguration().getPartitionKey() != null
- ? endpoint.getConfiguration().getPartitionKey() :
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+ ? endpoint.getConfiguration().getPartitionKey() :
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
final boolean hasPartitionKey = partitionKey != null;
// endpoint take precedence over header configuration
Object key = endpoint.getConfiguration().getKey() != null
- ? endpoint.getConfiguration().getKey() :
exchange.getIn().getHeader(KafkaConstants.KEY);
+ ? endpoint.getConfiguration().getKey() :
exchange.getIn().getHeader(KafkaConstants.KEY);
final Object messageKey = key != null
- ? tryConvertToSerializedType(exchange, key,
endpoint.getConfiguration().getKeySerializerClass()) : null;
+ ? tryConvertToSerializedType(exchange, key,
endpoint.getConfiguration().getKeySerializerClass()) : null;
final boolean hasMessageKey = messageKey != null;
+ // extracting headers which need to be propagated
+ HeaderFilterStrategy headerFilterStrategy =
endpoint.getConfiguration().getHeaderFilterStrategy();
+ List<Header> propagatedHeaders = getPropagatedHeaders(exchange,
headerFilterStrategy);
+
Object msg = exchange.getIn().getBody();
// is the message body a list or something that contains multiple
values
Iterator<Object> iterator = null;
if (msg instanceof Iterable) {
- iterator = ((Iterable<Object>)msg).iterator();
+ iterator = ((Iterable<Object>) msg).iterator();
} else if (msg instanceof Iterator) {
- iterator = (Iterator<Object>)msg;
+ iterator = (Iterator<Object>) msg;
}
if (iterator != null) {
final Iterator<Object> msgList = iterator;
@@ -194,11 +204,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
Object value = tryConvertToSerializedType(exchange, next,
endpoint.getConfiguration().getSerializerClass());
if (hasPartitionKey && hasMessageKey) {
- return new ProducerRecord(msgTopic, partitionKey, key,
value);
+ return new ProducerRecord(msgTopic, partitionKey,
null, key, value, propagatedHeaders);
} else if (hasMessageKey) {
- return new ProducerRecord(msgTopic, key, value);
+ return new ProducerRecord(msgTopic, null, null, key,
value, propagatedHeaders);
} else {
- return new ProducerRecord(msgTopic, value);
+ return new ProducerRecord(msgTopic, null, null, null,
value, propagatedHeaders);
}
}
@@ -214,15 +224,58 @@ public class KafkaProducer extends DefaultAsyncProducer {
ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
- record = new ProducerRecord(topic, partitionKey, key, value);
+ record = new ProducerRecord(topic, partitionKey, null, key, value,
propagatedHeaders);
} else if (hasMessageKey) {
- record = new ProducerRecord(topic, key, value);
+ record = new ProducerRecord(topic, null, null, key, value,
propagatedHeaders);
} else {
- record = new ProducerRecord(topic, value);
+ record = new ProducerRecord(topic, null, null, null, value,
propagatedHeaders);
}
return Collections.singletonList(record).iterator();
}
+ private List<Header> getPropagatedHeaders(Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ return exchange.getIn().getHeaders().entrySet().stream()
+ .filter(entry -> shouldBeFiltered(entry, exchange,
headerFilterStrategy))
+ .map(this::getRecordHeader)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange
exchange, HeaderFilterStrategy headerFilterStrategy) {
+ return
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
entry.getValue(), exchange);
+ }
+
+ private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) {
+ byte[] headerValue = getHeaderValue(entry.getValue());
+ if (headerValue == null) {
+ return null;
+ }
+ return new RecordHeader(entry.getKey(), headerValue);
+ }
+
+ private byte[] getHeaderValue(Object value) {
+ if (value instanceof String) {
+ return ((String) value).getBytes();
+ } else if (value instanceof Long) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong((Long) value);
+ return buffer.array();
+ } else if (value instanceof Integer) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.putInt((Integer) value);
+ return buffer.array();
+ } else if (value instanceof Double) {
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+ buffer.putDouble((Double) value);
+ return buffer.array();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ }
+ log.debug("Cannot propagate header value of type[{}], skipping... " +
+ "Supported types: String, Integer, Long, Double, byte[].",
value != null ? value.getClass() : "null");
+ return null;
+ }
+
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
// Camel calls this method if the endpoint isSynchronous(), as the
KafkaEndpoint creates a SynchronousDelegateProducer for it
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 5b35c8e..17272a4 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.StreamSupport;
@@ -25,6 +26,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -71,20 +73,30 @@ public class KafkaConsumerFullTest extends
BaseEmbeddedKafkaTest {
@Test
public void kafkaMessageIsConsumedByCamel() throws InterruptedException,
IOException {
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
to.expectedMessageCount(5);
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
// The LAST_RECORD_BEFORE_COMMIT header should not be configured on
any exchange because autoCommitEnable=true
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
for (int k = 0; k < 5; k++) {
String msg = "message-" + k;
ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
producer.send(data);
}
to.assertIsSatisfied(3000);
assertEquals(5,
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
false).count());
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse("Should not receive skipped header",
headers.containsKey(skippedHeaderKey));
+ assertTrue("Should receive propagated header",
headers.containsKey(propagatedHeaderKey));
}
@Test
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index a7e43da..643c783 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,14 +17,17 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
@@ -33,9 +36,14 @@ import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -48,24 +56,32 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
private static final String TOPIC_BYTES = "testBytes";
private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
private static final String GROUP_BYTES = "groupStrings";
+ private static final String TOPIC_PROPAGATED_HEADERS =
"testPropagatedHeaders";
private static KafkaConsumer<String, String> stringsConsumerConn;
private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
private Endpoint toStrings;
+
@EndpointInject(uri = "kafka:" + TOPIC_STRINGS +
"?requestRequiredAcks=-1&partitionKey=1")
private Endpoint toStrings2;
+
@EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED +
"?requestRequiredAcks=-1"
+
"&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
private Endpoint toStringsWithInterceptor;
+
@EndpointInject(uri = "mock:kafkaAck")
private MockEndpoint mockEndpoint;
+
@EndpointInject(uri = "kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1"
+
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+
"keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
private Endpoint toBytes;
+ @EndpointInject(uri = "kafka:" + TOPIC_PROPAGATED_HEADERS +
"?requestRequiredAcks=-1")
+ private Endpoint toPropagatedHeaders;
+
@Produce(uri = "direct:startStrings")
private ProducerTemplate stringsTemplate;
@@ -78,6 +94,16 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
@Produce(uri = "direct:startTraced")
private ProducerTemplate interceptedTemplate;
+ @Produce(uri = "direct:propagatedHeaders")
+ private ProducerTemplate propagatedHeadersTemplate;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myStrategy", new MyHeaderFilterStrategy());
+ return jndi;
+ }
+
@BeforeClass
public static void before() {
Properties stringsProps = new Properties();
@@ -118,6 +144,8 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
from("direct:startBytes").to(toBytes).to(mockEndpoint);
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
+
+
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
}
};
}
@@ -271,6 +299,88 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
}
}
+ @Test
+ public void propagatedHeaderIsReceivedByKafka() throws Exception {
+ String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
+ String propagatedStringHeaderValue = "propagated string header value";
+
+ String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
+ Integer propagatedIntegerHeaderValue = 54545;
+
+ String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
+ Long propagatedLongHeaderValue = 5454545454545L;
+
+ String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
+ Double propagatedDoubleHeaderValue = 43434.545D;
+
+ String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
+ byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3,
54, -34};
+
+ Map<String, Object> camelHeaders = new HashMap<>();
+ camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
+ camelHeaders.put(propagatedIntegerHeaderKey,
propagatedIntegerHeaderValue);
+ camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
+ camelHeaders.put(propagatedDoubleHeaderKey,
propagatedDoubleHeaderValue);
+ camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+ camelHeaders.put("CustomObjectHeader", new Object());
+ camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
+
+ CountDownLatch messagesLatch = new CountDownLatch(1);
+ propagatedHeadersTemplate.sendBodyAndHeaders("Some test message",
camelHeaders);
+
+ List<ConsumerRecord<String, String>> records =
pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch);
+ boolean allMessagesReceived = messagesLatch.await(10_000,
TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+ ConsumerRecord<String, String> record = records.get(0);
+ Headers headers = record.headers();
+ assertNotNull("Kafka Headers should not be null.", headers);
+ assertEquals("One propagated header is expected.", 5,
headers.toArray().length);
+ assertEquals("Propagated string value received",
propagatedStringHeaderValue,
+ new String(getHeaderValue(propagatedStringHeaderKey,
headers)));
+ assertEquals("Propagated integer value received",
propagatedIntegerHeaderValue,
+ new
Integer(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey,
headers)).getInt()));
+ assertEquals("Propagated long value received",
propagatedLongHeaderValue,
+ new
Long(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey,
headers)).getLong()));
+ assertEquals("Propagated double value received",
propagatedDoubleHeaderValue,
+ new
Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey,
headers)).getDouble()));
+ assertArrayEquals("Propagated byte array value received",
propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
+ }
+
+ @Test
+ public void headerFilterStrategyCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint =
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyHeaderFilterStrategy.class,
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
+ }
+
+ private byte[] getHeaderValue(String headerKey, Headers headers) {
+ Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
+ .filter(header -> header.key().equals(headerKey))
+ .findFirst()
+ .orElse(null);
+ assertNotNull("Header should be sent", foundHeader);
+ return foundHeader.value();
+ }
+
+ private List<ConsumerRecord<String, String>>
pollForRecords(KafkaConsumer<String, String> consumerConn,
+ String topic,
CountDownLatch messagesLatch) {
+
+ List<ConsumerRecord<String, String>> consumedRecords = new
ArrayList<>();
+ consumerConn.subscribe(Collections.singletonList(topic));
+
+ new Thread(() -> {
+ while (messagesLatch.getCount() != 0) {
+ for (ConsumerRecord<String, String> record :
consumerConn.poll(100)) {
+ consumedRecords.add(record);
+ messagesLatch.countDown();
+ }
+ }
+ }).start();
+
+ return consumedRecords;
+ }
+
private void createKafkaMessageConsumer(KafkaConsumer<String, String>
consumerConn,
String topic, String
topicInHeader, CountDownLatch messagesLatch) {
@@ -323,4 +433,7 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
}
}
+ private static class MyHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+ }
+
}
diff --git
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index f5624b1..e46ad41 100644
---
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.springboot;
import java.util.concurrent.ExecutorService;
import javax.annotation.Generated;
import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
import org.apache.camel.util.jsse.SSLContextParameters;
@@ -751,6 +752,11 @@ public class KafkaComponentConfiguration
* increase, 20% random jitter is added to avoid connection storms.
*/
private Integer reconnectBackoffMaxMs = 1000;
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from
+ * Camel message.
+ */
+ private HeaderFilterStrategy headerFilterStrategy;
public Boolean getTopicIsPattern() {
return topicIsPattern;
@@ -1452,5 +1458,14 @@ public class KafkaComponentConfiguration
public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ public void setHeaderFilterStrategy(
+ HeaderFilterStrategy headerFilterStrategy) {
+ this.headerFilterStrategy = headerFilterStrategy;
+ }
}
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
[email protected].