[
https://issues.apache.org/jira/browse/CAMEL-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475430#comment-16475430
]
ASF GitHub Bot commented on CAMEL-12503:
----------------------------------------
davsclaus commented on a change in pull request #2333: CAMEL-12503: added
support for propagating camel headers to kafka and vice versa
URL: https://github.com/apache/camel/pull/2333#discussion_r188192536
##########
File path:
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
##########
@@ -214,15 +223,49 @@ public void remove() {
ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
- record = new ProducerRecord(topic, partitionKey, key, value);
+ record = new ProducerRecord(topic, partitionKey, null, key, value,
propagatedHeaders);
} else if (hasMessageKey) {
- record = new ProducerRecord(topic, key, value);
+ record = new ProducerRecord(topic, null, null, key, value,
propagatedHeaders);
} else {
- record = new ProducerRecord(topic, value);
+ record = new ProducerRecord(topic, null, null, null, value,
propagatedHeaders);
}
return Collections.singletonList(record).iterator();
}
+ private List<Header> getPropagatedHeaders(Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ return exchange.getIn().getHeaders().entrySet().stream()
+ .filter(entry -> shouldBeFiltered(entry, exchange,
headerFilterStrategy))
+ .map(entry -> new RecordHeader(entry.getKey(),
getHeaderValue(entry.getValue())))
+ .collect(Collectors.toList());
+ }
+
+ private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange
exchange, HeaderFilterStrategy headerFilterStrategy) {
+ return
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
entry.getValue(), exchange);
+ }
+
+ private byte[] getHeaderValue(Object value) {
+ if (value instanceof String) {
+ return ((String) value).getBytes();
+ } else if (value instanceof Long) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong((Long) value);
+ return buffer.array();
+ } else if (value instanceof Integer) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.putInt((Integer) value);
+ return buffer.array();
+ } else if (value instanceof Double) {
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+ buffer.putDouble((Double) value);
+ return buffer.array();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ }
+ log.warn("Cannot propagate header value of type[{}], null will be
provided instead. " +
+ "Supported types: String, Integer, Long, Double, byte[].",
value != null ? value.getClass() : "null");
+ return null;
Review comment:
Do you really want a null value for unsupported headers. I think we should
drop the header entirely. This is what eg JMS component do and others. Also we
should consider avoiding logging noise, eg a WARN log will be for every header.
So maybe log at DEBUG by default. And then have better documentation about
which header types is supported in the docs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka component should be able to propagate camel headers to kafka
> ------------------------------------------------------------------
>
> Key: CAMEL-12503
> URL: https://issues.apache.org/jira/browse/CAMEL-12503
> Project: Camel
> Issue Type: New Feature
> Reporter: Taras Danylchuk
> Priority: Major
>
> Since 0.11.0 Kafka support headers, and it would be awesome to have such
> feature available also in camel component.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)