[
https://issues.apache.org/jira/browse/CAMEL-12651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544850#comment-16544850
]
ASF GitHub Bot commented on CAMEL-12651:
----------------------------------------
oscerd closed pull request #2427: CAMEL-12651 - Allow to override headers
serializing and deserializing
URL: https://github.com/apache/camel/pull/2427
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f16bb3542a6..4559ab4687f 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (91 parameters):
+==== Query Parameters (93 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -98,6 +98,7 @@ with the following path and query parameters:
| *fetchWaitMaxMs* (consumer) | The maximum amount of time the server will
block before answering the fetch request if there isn't sufficient data to
immediately satisfy fetch.min.bytes | 500 | Integer
| *groupId* (consumer) | A string that uniquely identifies the group of
consumer processes to which this consumer belongs. By setting the same group id
multiple processes indicate that they are all part of the same consumer group.
This option is required for consumers. | | String
| *heartbeatIntervalMs* (consumer) | The expected time between heartbeats to
the consumer coordinator when using Kafka's group management facilities.
Heartbeats are used to ensure that the consumer's session stays active and to
facilitate rebalancing when new consumers join or leave the group. The value
must be set lower than session.timeout.ms, but typically should be set no
higher than 1/3 of that value. It can be adjusted even lower to control the
expected time for normal rebalances. | 3000 | Integer
+| *kafkaHeaderDeserializer* (consumer) | Sets custom KafkaHeaderDeserializer
for deserialization kafka headers values to camel headers values. | |
KafkaHeaderDeserializer
| *keyDeserializer* (consumer) | Deserializer class for key that implements
the Deserializer interface. |
org.apache.kafka.common.serialization.StringDeserializer | String
| *maxPartitionFetchBytes* (consumer) | The maximum amount of data
per-partition the server will return. The maximum total memory used for a
request will be partitions max.partition.fetch.bytes. This size must be at
least as large as the maximum message size the server allows or else it is
possible for the producer to send messages larger than the consumer can fetch.
If that happens, the consumer can get stuck trying to fetch a large message on
a certain partition. | 1048576 | Integer
| *maxPollIntervalMs* (consumer) | The maximum delay between invocations of
poll() when using consumer group management. This places an upper bound on the
amount of time that the consumer can be idle before fetching more records. If
poll() is not called before expiration of this timeout, then the consumer is
considered failed and the group will rebalance in order to reassign the
partitions to another member. | | Long
@@ -117,6 +118,7 @@ with the following path and query parameters:
| *compressionCodec* (producer) | This parameter allows you to specify the
compression codec for all data generated by this producer. Valid values are
none, gzip and snappy. | none | String
| *connectionMaxIdleMs* (producer) | Close idle connections after the number
of milliseconds specified by this config. | 540000 | Integer
| *enableIdempotence* (producer) | If set to 'true' the producer will ensure
that exactly one copy of each message is written in the stream. If 'false',
producer retries may write duplicates of the retried message in the stream. If
set to true this option will require max.in.flight.requests.per.connection to
be set to 1 and retries cannot be zero and additionally acks must be set to
'all'. | false | boolean
+| *kafkaHeaderSerializer* (producer) | Sets custom KafkaHeaderDeserializer for
serialization camel headers values to kafka headers values. | |
KafkaHeaderSerializer
| *key* (producer) | The record key (or null if no key is specified). If this
option has been configured then it take precedence over header link
KafkaConstantsKEY | | String
| *keySerializerClass* (producer) | The serializer class for keys (defaults to
the same as for messages if nothing is given). |
org.apache.kafka.common.serialization.StringSerializer | String
| *lingerMs* (producer) | The producer groups together any records that arrive
in between request transmissions into a single batched request. Normally this
occurs only under load when records arrive faster than they can be sent out.
However in some circumstances the client may want to reduce the number of
requests even under moderate load. This setting accomplishes this by adding a
small amount of artificial delaythat is, rather than immediately sending out a
record the producer will wait for up to the given delay to allow other records
to be sent so that the sends can be batched together. This can be thought of as
analogous to Nagle's algorithm in TCP. This setting gives the upper bound on
the delay for batching: once we get batch.size worth of records for a partition
it will be sent immediately regardless of this setting, however if we have
fewer than this many bytes accumulated for this partition we will 'linger' for
the specified time waiting for more records to show up. This setting defaults
to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect
of reducing the number of requests sent but would add up to 5ms of latency to
records sent in the absense of load. | 0 | Integer
@@ -450,8 +452,14 @@ Producing flow backed by same behaviour - camel headers of
particular exchange w
Since kafka headers allows only `byte[]` values, in order camel exchnage
header to be propagated its value should be serialized to `bytes[]`,
otherwise header will be skipped.
-Following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `byte[]`.
-Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value.
+Following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `Boolean`, `byte[]`.
+Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value by default.
+In order to override default functionality uri parameters can be set:
`kafkaHeaderDeserializer` for `from` route and `kafkaHeaderSerializer` for `to`
route. Example:
+```
+from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer")
+...
+.to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
+```
By default all headers are being filtered by `KafkaHeaderFilterStrategy`.
Strategy filters out headers which start with `Camel` or `org.apache.camel`
prefixes.
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index dabd4755689..6576fd7d5d7 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -24,6 +24,10 @@
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
@@ -66,6 +70,8 @@
private int consumerStreams = 10;
@UriParam(label = "consumer", defaultValue = "1")
private int consumersCount = 1;
+ @UriParam(label = "consumer", description = "To use a custom
KafkaHeaderDeserializer to deserialize kafka headers values")
+ private KafkaHeaderDeserializer kafkaHeaderDeserializer = new
DefaultKafkaHeaderDeserializer();
//interceptor.classes
@UriParam(label = "common,monitoring")
@@ -221,6 +227,9 @@
//reconnect.backoff.ms
@UriParam(label = "producer", defaultValue = "false")
private boolean enableIdempotence;
+ @UriParam(label = "producer", description = "To use a custom
KafkaHeaderSerializer to serialize kafka headers values")
+ private KafkaHeaderSerializer kafkaHeaderSerializer = new
DefaultKafkaHeaderSerializer();
+
//reconnect.backoff.max.ms
@UriParam(label = "common", defaultValue = "1000")
private Integer reconnectBackoffMaxMs = 1000;
@@ -1614,4 +1623,30 @@ public void setHeaderFilterStrategy(HeaderFilterStrategy
headerFilterStrategy) {
this.headerFilterStrategy = headerFilterStrategy;
}
+ public KafkaHeaderDeserializer getKafkaHeaderDeserializer() {
+ return kafkaHeaderDeserializer;
+ }
+
+ /**
+ * Sets custom KafkaHeaderDeserializer for deserialization kafka headers
values to camel headers values.
+ *
+ * @param kafkaHeaderDeserializer custom kafka header deserializer to be
used
+ */
+ public void setKafkaHeaderDeserializer(final KafkaHeaderDeserializer
kafkaHeaderDeserializer) {
+ this.kafkaHeaderDeserializer = kafkaHeaderDeserializer;
+ }
+
+ public KafkaHeaderSerializer getKafkaHeaderSerializer() {
+ return kafkaHeaderSerializer;
+ }
+
+ /**
+ * Sets custom KafkaHeaderDeserializer for serialization camel headers
values to kafka headers values.
+ *
+ * @param kafkaHeaderSerializer custom kafka header serializer to be used
+ */
+ public void setKafkaHeaderSerializer(final KafkaHeaderSerializer
kafkaHeaderSerializer) {
+ this.kafkaHeaderSerializer = kafkaHeaderSerializer;
+ }
+
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index c585e054f90..5f56270c4ab 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -31,6 +31,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
@@ -283,7 +284,7 @@ protected boolean doRun() {
}
Exchange exchange =
endpoint.createKafkaExchange(record);
- propagateHeaders(record, exchange,
endpoint.getConfiguration().getHeaderFilterStrategy());
+ propagateHeaders(record, exchange,
endpoint.getConfiguration());
// if not auto commit then we have additional
information on the exchange
if (!isAutoCommitEnabled()) {
@@ -428,10 +429,12 @@ public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}
- private void propagateHeaders(ConsumerRecord<Object, Object> record,
Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+ private void propagateHeaders(ConsumerRecord<Object, Object> record,
Exchange exchange, KafkaConfiguration kafkaConfiguration) {
+ HeaderFilterStrategy headerFilterStrategy =
kafkaConfiguration.getHeaderFilterStrategy();
+ KafkaHeaderDeserializer headerDeserializer =
kafkaConfiguration.getKafkaHeaderDeserializer();
StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> shouldBeFiltered(header, exchange,
headerFilterStrategy))
- .forEach(header -> exchange.getIn().setHeader(header.key(),
header.value()));
+ .forEach(header -> exchange.getIn().setHeader(header.key(),
headerDeserializer.deserialize(header.key(), header.value())));
}
private boolean shouldBeFiltered(Header header, Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index b3e9959af64..bfa9fe2efd9 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -34,6 +34,7 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.URISupport;
@@ -43,6 +44,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
public class KafkaProducer extends DefaultAsyncProducer {
@@ -176,8 +178,7 @@ protected void doStop() throws Exception {
final boolean hasMessageKey = messageKey != null;
// extracting headers which need to be propagated
- HeaderFilterStrategy headerFilterStrategy =
endpoint.getConfiguration().getHeaderFilterStrategy();
- List<Header> propagatedHeaders = getPropagatedHeaders(exchange,
headerFilterStrategy);
+ List<Header> propagatedHeaders = getPropagatedHeaders(exchange,
endpoint.getConfiguration());
Object msg = exchange.getIn().getBody();
@@ -233,10 +234,12 @@ public void remove() {
return Collections.singletonList(record).iterator();
}
- private List<Header> getPropagatedHeaders(Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ private List<Header> getPropagatedHeaders(Exchange exchange,
KafkaConfiguration getConfiguration) {
+ HeaderFilterStrategy headerFilterStrategy =
getConfiguration.getHeaderFilterStrategy();
+ KafkaHeaderSerializer headerSerializer =
getConfiguration.getKafkaHeaderSerializer();
return exchange.getIn().getHeaders().entrySet().stream()
.filter(entry -> shouldBeFiltered(entry, exchange,
headerFilterStrategy))
- .map(this::getRecordHeader)
+ .map(entry -> getRecordHeader(entry, headerSerializer))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@@ -245,36 +248,14 @@ private boolean shouldBeFiltered(Map.Entry<String,
Object> entry, Exchange excha
return
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
entry.getValue(), exchange);
}
- private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) {
- byte[] headerValue = getHeaderValue(entry.getValue());
+ private RecordHeader getRecordHeader(Map.Entry<String, Object> entry,
KafkaHeaderSerializer headerSerializer) {
+ byte[] headerValue = headerSerializer.serialize(entry.getKey(),
entry.getValue());
if (headerValue == null) {
return null;
}
return new RecordHeader(entry.getKey(), headerValue);
}
- private byte[] getHeaderValue(Object value) {
- if (value instanceof String) {
- return ((String) value).getBytes();
- } else if (value instanceof Long) {
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- buffer.putLong((Long) value);
- return buffer.array();
- } else if (value instanceof Integer) {
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
- buffer.putInt((Integer) value);
- return buffer.array();
- } else if (value instanceof Double) {
- ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
- buffer.putDouble((Double) value);
- return buffer.array();
- } else if (value instanceof byte[]) {
- return (byte[]) value;
- }
- log.debug("Cannot propagate header value of type[{}], skipping... " +
"Supported types: String, Integer, Long, Double, byte[].", value != null ?
value.getClass() : "null");
- return null;
- }
-
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
// Camel calls this method if the endpoint isSynchronous(), as the
KafkaEndpoint creates a SynchronousDelegateProducer for it
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java
new file mode 100644
index 00000000000..5d26d08734c
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+public class DefaultKafkaHeaderDeserializer implements KafkaHeaderDeserializer
{
+
+ @Override
+ public Object deserialize(final String key, final byte[] value) {
+ return value;
+ }
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
new file mode 100644
index 00000000000..45db0e85076
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultKafkaHeaderSerializer.class);
+
+ @Override
+ public byte[] serialize(final String key, final Object value) {
+ if (value instanceof String) {
+ return ((String) value).getBytes();
+ } else if (value instanceof Long) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong((Long) value);
+ return buffer.array();
+ } else if (value instanceof Integer) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.putInt((Integer) value);
+ return buffer.array();
+ } else if (value instanceof Double) {
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+ buffer.putDouble((Double) value);
+ return buffer.array();
+ } else if (value instanceof Boolean) {
+ return value.toString().getBytes();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ }
+ LOG.debug("Cannot propagate header value of type[{}], skipping... "
+ + "Supported types: String, Integer, Long, Double, byte[].",
value != null ? value.getClass() : "null");
+ return null;
+ }
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java
new file mode 100644
index 00000000000..282fa95ecde
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+/**
+ * Deserializer for kafka header value.
+ */
+public interface KafkaHeaderDeserializer {
+
+ Object deserialize(String key, byte[] value);
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java
new file mode 100644
index 00000000000..1f626558e44
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+/**
+ * Serializer for kafka header value.
+ */
+public interface KafkaHeaderSerializer {
+
+ byte[] serialize(String key, Object value);
+}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 17272a41c03..f0e32df45b4 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -24,7 +24,9 @@
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.After;
@@ -71,6 +73,13 @@ public void configure() throws Exception {
};
}
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myHeaderDeserializer", new MyKafkaHeaderDeserializer());
+ return jndi;
+ }
+
@Test
public void kafkaMessageIsConsumedByCamel() throws InterruptedException,
IOException {
String propagatedHeaderKey = "PropagatedCustomHeader";
@@ -158,5 +167,15 @@ public void kafkaMessageIsConsumedByCamelSeekedToEnd()
throws Exception {
}
to.assertIsSatisfied(3000);
}
+
+ @Test
+ public void headerDeserializerCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+
"kafka:random_topic?kafkaHeaderDeserializer=#myHeaderDeserializer",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyKafkaHeaderDeserializer.class,
kafkaEndpoint.getConfiguration().getKafkaHeaderDeserializer());
+ }
+
+ private static class MyKafkaHeaderDeserializer extends
DefaultKafkaHeaderDeserializer {
+ }
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 4e9a69e6d06..bcbeb1bcd3e 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -35,6 +35,8 @@
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultHeaderFilterStrategy;
import org.apache.camel.impl.JndiRegistry;
@@ -101,6 +103,7 @@
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry jndi = super.createRegistry();
jndi.bind("myStrategy", new MyHeaderFilterStrategy());
+ jndi.bind("myHeaderSerializer", new MyKafkaHeadersSerializer());
return jndi;
}
@@ -316,13 +319,19 @@ public void propagatedHeaderIsReceivedByKafka() throws
Exception {
String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3,
54, -34};
+ String propagatedBooleanHeaderKey = "PROPAGATED_BOOLEAN_HEADER";
+ Boolean propagatedBooleanHeaderValue = Boolean.TRUE;
+
Map<String, Object> camelHeaders = new HashMap<>();
camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
camelHeaders.put(propagatedIntegerHeaderKey,
propagatedIntegerHeaderValue);
camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
camelHeaders.put(propagatedDoubleHeaderKey,
propagatedDoubleHeaderValue);
camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+ camelHeaders.put(propagatedBooleanHeaderKey,
propagatedBooleanHeaderValue);
+
camelHeaders.put("CustomObjectHeader", new Object());
+ camelHeaders.put("CustomNullObjectHeader", null);
camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
CountDownLatch messagesLatch = new CountDownLatch(1);
@@ -336,8 +345,8 @@ public void propagatedHeaderIsReceivedByKafka() throws
Exception {
ConsumerRecord<String, String> record = records.get(0);
Headers headers = record.headers();
assertNotNull("Kafka Headers should not be null.", headers);
- // we have 5 headers and 1 header with breadcrumbId
- assertEquals("One propagated header is expected.", 6,
headers.toArray().length);
+ // we have 6 headers and 1 header with breadcrumbId
+ assertEquals("Seven propagated header is expected.", 7,
headers.toArray().length);
assertEquals("Propagated string value received",
propagatedStringHeaderValue,
new String(getHeaderValue(propagatedStringHeaderKey,
headers)));
assertEquals("Propagated integer value received",
propagatedIntegerHeaderValue,
@@ -347,14 +356,24 @@ public void propagatedHeaderIsReceivedByKafka() throws
Exception {
assertEquals("Propagated double value received",
propagatedDoubleHeaderValue,
new
Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey,
headers)).getDouble()));
assertArrayEquals("Propagated byte array value received",
propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
+ assertEquals("Propagated boolean value received",
propagatedBooleanHeaderValue,
+ Boolean.valueOf(new
String(getHeaderValue(propagatedBooleanHeaderKey, headers))));
}
@Test
public void headerFilterStrategyCouldBeOverridden() {
- KafkaEndpoint kafkaEndpoint =
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
+ KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+
"kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
assertIsInstanceOf(MyHeaderFilterStrategy.class,
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
}
+ @Test
+ public void headerSerializerCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+
"kafka:TOPIC_PROPAGATED_HEADERS?kafkaHeaderSerializer=#myHeaderSerializer",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyKafkaHeadersSerializer.class,
kafkaEndpoint.getConfiguration().getKafkaHeaderSerializer());
+ }
+
private byte[] getHeaderValue(String headerKey, Headers headers) {
Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals(headerKey))
@@ -437,4 +456,7 @@ private void sendMessagesInRoute(int messages,
ProducerTemplate template, Object
private static class MyHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
}
+ private static class MyKafkaHeadersSerializer extends
DefaultKafkaHeaderSerializer {
+ }
+
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java
new file mode 100644
index 00000000000..4e7811484ea
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
+public class DefaultKafkaHeaderDeserializerTest {
+
+ private KafkaHeaderDeserializer deserializer = new
DefaultKafkaHeaderDeserializer();
+
+ @Test
+ public void shouldDeserializeAsIs() {
+ byte[] value = new byte[]{0, 4, -2, 54, 126};
+
+ Object deserializedValue = deserializer.deserialize("someKey", value);
+
+ assertThat(deserializedValue, CoreMatchers.instanceOf(byte[].class));
+ assertArrayEquals(value, (byte[]) deserializedValue);
+ }
+
+}
\ No newline at end of file
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
new file mode 100644
index 00000000000..6b90cbc0df2
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertArrayEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultKafkaHeaderSerializerTest {
+
+ private KafkaHeaderSerializer serializer = new
DefaultKafkaHeaderSerializer();
+
+ private Object value;
+ private byte[] expectedResult;
+
+ public DefaultKafkaHeaderSerializerTest(Object value, byte[]
expectedResult) {
+ this.value = value;
+ this.expectedResult = expectedResult;
+ }
+
+ @Test
+ public void serialize() {
+ byte[] result = serializer.serialize("someKey", value);
+
+ assertArrayEquals(expectedResult, result);
+ }
+
+ @Parameterized.Parameters
+ public static Collection primeNumbers() {
+ return Arrays.asList(new Object[][]{
+ {Boolean.TRUE, "true".getBytes()}, //boolean
+ {-12, new byte[]{-1, -1, -1, -12}}, //integer
+ {19L, new byte[]{0, 0, 0, 0, 0, 0, 0, 19}}, //long
+ {22.0D, new byte[]{64, 54, 0, 0, 0, 0, 0, 0}}, //double
+ {"someValue", "someValue".getBytes()}, //string
+ {new byte[]{0, 2, -43}, new byte[]{0, 2, -43}}, //byte[]
+ {null, null}, //null
+ {new Object(), null} //unknown type
+ });
+ }
+}
\ No newline at end of file
diff --git
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 5c466eec305..c6949a5279b 100644
---
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -18,6 +18,8 @@
import java.util.concurrent.ExecutorService;
import javax.annotation.Generated;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
@@ -756,6 +758,22 @@ public void setResolvePropertyPlaceholders(
* Camel message.
*/
private HeaderFilterStrategy headerFilterStrategy;
+ /**
+ * Sets custom KafkaHeaderDeserializer for deserialization kafka
headers
+ * values to camel headers values.
+ *
+ * @param kafkaHeaderDeserializer
+ * custom kafka header deserializer to be used
+ */
+ private KafkaHeaderDeserializer kafkaHeaderDeserializer;
+ /**
+ * Sets custom KafkaHeaderDeserializer for serialization camel headers
+ * values to kafka headers values.
+ *
+ * @param kafkaHeaderSerializer
+ * custom kafka header serializer to be used
+ */
+ private KafkaHeaderSerializer kafkaHeaderSerializer;
public Boolean getTopicIsPattern() {
return topicIsPattern;
@@ -1466,5 +1484,23 @@ public void setHeaderFilterStrategy(
HeaderFilterStrategy headerFilterStrategy) {
this.headerFilterStrategy = headerFilterStrategy;
}
+
+ public KafkaHeaderDeserializer getKafkaHeaderDeserializer() {
+ return kafkaHeaderDeserializer;
+ }
+
+ public void setKafkaHeaderDeserializer(
+ KafkaHeaderDeserializer kafkaHeaderDeserializer) {
+ this.kafkaHeaderDeserializer = kafkaHeaderDeserializer;
+ }
+
+ public KafkaHeaderSerializer getKafkaHeaderSerializer() {
+ return kafkaHeaderSerializer;
+ }
+
+ public void setKafkaHeaderSerializer(
+ KafkaHeaderSerializer kafkaHeaderSerializer) {
+ this.kafkaHeaderSerializer = kafkaHeaderSerializer;
+ }
}
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Allow to override serializing and deserializing default mechanism for kafka
> headers
> -----------------------------------------------------------------------------------
>
> Key: CAMEL-12651
> URL: https://issues.apache.org/jira/browse/CAMEL-12651
> Project: Camel
> Issue Type: New Feature
> Components: camel-kafka
> Reporter: Taras Danylchuk
> Assignee: Andrea Cosentino
> Priority: Major
> Fix For: 2.23.0
>
>
> Kafka headers propagation is available since 2.22.0 but default mechanism for
> serializing and desirialising is not enough in some corner cases.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)