This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 34259dcfa79a5e60fad86ee0f32be977d499b2da Author: Claus Ibsen <[email protected]> AuthorDate: Tue Oct 1 12:20:19 2019 +0200 CAMEL-13878: Message is forwarded to the wrong Kafka Topic. --- .../camel-kafka/src/main/docs/kafka-component.adoc | 8 ++--- .../camel/component/kafka/KafkaConfiguration.java | 36 ------------------- .../camel/component/kafka/KafkaProducer.java | 21 ----------- .../camel/component/kafka/KafkaProducerTest.java | 42 ++-------------------- 4 files changed, 5 insertions(+), 102 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 1d2182e..c459720 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -332,16 +332,12 @@ Before sending a message to Kafka you can configure the following headers. |=== | Header constant | Header value | Type | Description | KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition -| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`) | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved. | | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined) |=== -If you want to send a message to a dynamic topic then favour using `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header -that are not send along the message, as its removed in the producer. If you are using `KafkaConstants.TOPIC` then the header -is propagated in the producer, which means that if the consumer is also Camel and you want to route to a 3rd topic then -this header may interfere and route to a previous topic, which is not your intention. Therefore you need to turn on the bridgeEndpoint option; -or better just use the `KafkaConstants.OVERRIDE_TOPIC` header instead. +If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header +that are not send along the message, as its removed in the producer. After the message is sent to Kafka, the following headers are available [width="100%",cols="2m,2m,1m,5",options="header"] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 71564fe..02db043 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -137,12 +137,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware @UriParam(label = "consumer") private StateRepository<String, String> offsetRepository; - // Producer Camel specific configuration properties - @UriParam(label = "producer") - private boolean bridgeEndpoint; - @UriParam(label = "producer", defaultValue = "true") - private boolean circularTopicDetection = true; - // Producer configuration properties @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER) private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER; @@ -556,36 +550,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware this.groupId = groupId; } - public boolean isBridgeEndpoint() { - return bridgeEndpoint; - } - - /** - * If the option is true, then KafkaProducer will ignore the - * KafkaConstants.TOPIC header setting of the inbound message. - */ - public void setBridgeEndpoint(boolean bridgeEndpoint) { - this.bridgeEndpoint = bridgeEndpoint; - } - - public boolean isCircularTopicDetection() { - return circularTopicDetection; - } - - /** - * If the option is true, then KafkaProducer will detect if the message is - * attempted to be sent back to the same topic it may come from, if the - * message was original from a kafka consumer. If the KafkaConstants.TOPIC - * header is the same as the original kafka consumer topic, then the header - * setting is ignored, and the topic of the producer endpoint is used. In - * other words this avoids sending the same message back to where it came - * from. This option is not in use if the option bridgeEndpoint is set to - * true. - */ - public void setCircularTopicDetection(boolean circularTopicDetection) { - this.circularTopicDetection = circularTopicDetection; - } - public String getPartitioner() { return partitioner; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index ea3c8a6..04bbb32 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -147,27 +147,6 @@ public class KafkaProducer extends DefaultAsyncProducer { if (overrideTopic != null) { log.debug("Using override topic: {}", overrideTopic); topic = overrideTopic.toString(); - } else if (!endpoint.getConfiguration().isBridgeEndpoint()) { - String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class); - boolean allowHeader = true; - - // when we do not bridge then detect if we try to send back to ourselves - // which we most likely do not want to do - if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) { - Endpoint from = exchange.getFromEndpoint(); - if (from instanceof KafkaEndpoint) { - String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic(); - allowHeader = !headerTopic.equals(fromTopic); - if (!allowHeader) { - log.debug("Circular topic detected from message header." - + " Cannot send to same topic as the message comes from: {}" - + ". Will use endpoint configured topic: {}", from, topic); - } - } - } - if (allowHeader && headerTopic != null) { - topic = headerTopic; - } } if (topic == null) { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index e3120fa..93512fc 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -170,7 +170,7 @@ public class KafkaProducerTest { producer.process(exchange); - verifySendMessage("anotherTopic"); + verifySendMessage("sometopic"); assertRecordMetadataExists(); } @@ -189,7 +189,7 @@ public class KafkaProducerTest { // the header is preserved assertNotNull(in.getHeader(KafkaConstants.TOPIC)); - verifySendMessage(4, "anotherTopic", "someKey"); + verifySendMessage(4, "sometopic", "someKey"); assertRecordMetadataExists(); } @@ -277,9 +277,8 @@ public class KafkaProducerTest { } @Test - public void processSendMessageWithBridgeEndpoint() throws Exception { + public void processSendMessageWithTopicHeader() throws Exception { endpoint.getConfiguration().setTopic("someTopic"); - endpoint.getConfiguration().setBridgeEndpoint(true); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); @@ -292,41 +291,6 @@ public class KafkaProducerTest { assertRecordMetadataExists(); } - @Test - public void processSendMessageWithCircularDetected() throws Exception { - endpoint.getConfiguration().setTopic("sometopic"); - endpoint.getConfiguration().setCircularTopicDetection(true); - Mockito.when(exchange.getIn()).thenReturn(in); - Mockito.when(exchange.getOut()).thenReturn(out); - Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint); - // this is the from topic that are from the fromEndpoint - in.setHeader(KafkaConstants.TOPIC, "fromtopic"); - in.setHeader(KafkaConstants.KEY, "somekey"); - - producer.process(exchange); - - verifySendMessage("sometopic", "somekey"); - assertRecordMetadataExists(); - } - - @Test - public void processSendMessageWithNoCircularDetected() throws Exception { - endpoint.getConfiguration().setTopic("sometopic"); - endpoint.getConfiguration().setCircularTopicDetection(false); - Mockito.when(exchange.getIn()).thenReturn(in); - Mockito.when(exchange.getOut()).thenReturn(out); - Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint); - // this is the from topic that are from the fromEndpoint - in.setHeader(KafkaConstants.TOPIC, "fromtopic"); - in.setHeader(KafkaConstants.KEY, "somekey"); - - producer.process(exchange); - - // will end up sending back to itself at fromtopic - verifySendMessage("fromtopic", "somekey"); - assertRecordMetadataExists(); - } - @Test // Message and Topic Name alone public void processSendsMessageWithMessageTopicName() throws Exception { endpoint.getConfiguration().setTopic("someTopic");
