This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit 7c959e1cb8c5dba287098f936c801b1f26d340dd Author: Christoph Deppisch <[email protected]> AuthorDate: Mon Mar 21 20:07:28 2022 +0100 chore: Kafka source uses local bean for header deserialization --- kamelets/kafka-source.kamelet.yaml | 14 ++++++++------ .../serialization/kafka/KafkaHeaderDeserializer.java | 15 +++++++++++---- .../serialization/kafka/KafkaHeaderDeserializerTest.java | 12 ++++++++---- .../src/main/resources/kamelets/kafka-source.kamelet.yaml | 14 ++++++++------ 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml index 6bce42c..d7518b2 100644 --- a/kamelets/kafka-source.kamelet.yaml +++ b/kamelets/kafka-source.kamelet.yaml @@ -130,6 +130,12 @@ spec: - "camel:kafka" - "camel:kamelet" template: + beans: + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + property: + - key: enabled + value: '{{deserializeHeaders}}' from: uri: "kafka:{{topic}}" parameters: @@ -143,10 +149,6 @@ spec: autoOffsetReset: "{{autoOffsetReset}}" groupId: "{{?consumerGroup}}" steps: - - set-property: - name: deserializeHeaders - constant: "{{deserializeHeaders}}" - - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" - - remove-property: - name: deserializeHeaders + - process: + ref: "{{kafkaHeaderDeserializer}}" - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java index 7cab1ee..3fc24cc 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.camel.Exchange; -import org.apache.camel.ExchangeProperty; +import org.apache.camel.Processor; import org.apache.camel.TypeConverter; import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.support.SimpleTypeConverter; @@ -31,12 +31,15 @@ import org.apache.camel.support.SimpleTypeConverter; * Uses given type converter implementation set on the Camel context to convert values. If no type converter is set * the deserializer uses its own fallback conversion implementation. */ -public class KafkaHeaderDeserializer { +public class KafkaHeaderDeserializer implements Processor { + + boolean enabled = false; private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert); - public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception { - if (!deserializeHeaders) { + @Override + public void process(Exchange exchange) throws Exception { + if (!enabled) { return; } @@ -86,4 +89,8 @@ public class KafkaHeaderDeserializer { private boolean shouldDeserialize(Map.Entry<String, Object> entry) { return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT); } + + public void setEnabled(String enabled) { + this.enabled = Boolean.parseBoolean(enabled); + } } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java index d5d92f5..2d7e3bd 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java @@ -48,7 +48,8 @@ class KafkaHeaderDeserializerTest { exchange.getMessage().setHeader("fooNull", null); exchange.getMessage().setHeader("number", 1L); - processor.process(true, exchange); + processor.enabled = true; + processor.process(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); @@ -68,7 +69,8 @@ class KafkaHeaderDeserializerTest { exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); exchange.getMessage().setHeader("fooNull", null); - processor.process(true, exchange); + processor.enabled = true; + processor.process(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo")); @@ -84,7 +86,8 @@ class KafkaHeaderDeserializerTest { exchange.getMessage().setHeader("foo", "bar"); exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); - processor.process(true, exchange); + processor.enabled = true; + processor.process(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); @@ -98,7 +101,8 @@ class KafkaHeaderDeserializerTest { exchange.getMessage().setHeader("foo", "bar"); exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); - processor.process(false, exchange); + processor.enabled = false; + processor.process(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml index 6bce42c..d7518b2 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml @@ -130,6 +130,12 @@ spec: - "camel:kafka" - "camel:kamelet" template: + beans: + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + property: + - key: enabled + value: '{{deserializeHeaders}}' from: uri: "kafka:{{topic}}" parameters: @@ -143,10 +149,6 @@ spec: autoOffsetReset: "{{autoOffsetReset}}" groupId: "{{?consumerGroup}}" steps: - - set-property: - name: deserializeHeaders - constant: "{{deserializeHeaders}}" - - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" - - remove-property: - name: deserializeHeaders + - process: + ref: "{{kafkaHeaderDeserializer}}" - to: "kamelet:sink"
