This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit a00e82de71e9980f9d354666eec3964a54c523a3 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Oct 1 12:00:20 2019 +0200 CAMEL-13878: Message is forwarded to the wrong Kafka Topic. --- .../camel-kafka/src/main/docs/kafka-component.adoc | 7 +++++++ .../camel/component/kafka/KafkaConstants.java | 1 + .../camel/component/kafka/KafkaProducer.java | 7 ++++++- .../camel/component/kafka/KafkaProducerTest.java | 24 ++++++++++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 1e9968d..1d2182e 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -333,9 +333,16 @@ Before sending a message to Kafka you can configure the following headers. | Header constant | Header value | Type | Description | KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition | KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`) +| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved. | | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined) |=== +If you want to send a message to a dynamic topic then favour using `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header +that are not send along the message, as its removed in the producer. If you are using `KafkaConstants.TOPIC` then the header +is propagated in the producer, which means that if the consumer is also Camel and you want to route to a 3rd topic then +this header may interfere and route to a previous topic, which is not your intention. Therefore you need to turn on the bridgeEndpoint option; +or better just use the `KafkaConstants.OVERRIDE_TOPIC` header instead. + After the message is sent to Kafka, the following headers are available [width="100%",cols="2m,2m,1m,5",options="header"] |=== diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 0d1468d..ecda3bf 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -22,6 +22,7 @@ public final class KafkaConstants { public static final String PARTITION = "kafka.PARTITION"; public static final String KEY = "kafka.KEY"; public static final String TOPIC = "kafka.TOPIC"; + public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC"; public static final String OFFSET = "kafka.OFFSET"; public static final String HEADERS = "kafka.HEADERS"; public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT"; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index c43d824..ea3c8a6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -142,7 +142,12 @@ public class KafkaProducer extends DefaultAsyncProducer { protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws Exception { String topic = endpoint.getConfiguration().getTopic(); - if (!endpoint.getConfiguration().isBridgeEndpoint()) { + // must remove header so its not propagated + Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC); + if (overrideTopic != null) { + log.debug("Using override topic: {}", overrideTopic); + topic = overrideTopic.toString(); + } else if (!endpoint.getConfiguration().isBridgeEndpoint()) { String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class); boolean allowHeader = true; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index a67aaf9..e3120fa 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -40,6 +40,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -184,6 +186,28 @@ public class KafkaProducerTest { producer.process(exchange); + // the header is preserved + assertNotNull(in.getHeader(KafkaConstants.TOPIC)); + + verifySendMessage(4, "anotherTopic", "someKey"); + assertRecordMetadataExists(); + } + + @Test + public void processSendsMessageWithOverrideTopicHeaderAndEndPoint() throws Exception { + endpoint.getConfiguration().setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + + in.setHeader(KafkaConstants.PARTITION_KEY, 4); + in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic"); + in.setHeader(KafkaConstants.KEY, "someKey"); + + producer.process(exchange); + + // the header is now removed + assertNull(in.getHeader(KafkaConstants.OVERRIDE_TOPIC)); + verifySendMessage(4, "anotherTopic", "someKey"); assertRecordMetadataExists(); }
