Repository: camel
Updated Branches:
  refs/heads/master bab5b27bc -> 6243402b2


CAMEL-1193: Make kafka easier to use when routing between topics to avoid the 
header topic causing Camel to send the message to itself instead of the 
endpoint topic name as users would expect.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d164d54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d164d54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d164d54

Branch: refs/heads/master
Commit: 1d164d54675069fb672be606e99c2c7944cd8f23
Parents: bab5b27
Author: Claus Ibsen <[email protected]>
Authored: Mon Apr 24 13:03:56 2017 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Mon Apr 24 13:14:41 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  3 +-
 .../camel/component/kafka/KafkaEndpoint.java    | 18 +++++++-
 .../camel/component/kafka/KafkaProducer.java    | 24 ++++++++++-
 .../component/kafka/KafkaProducerTest.java      | 44 ++++++++++++++++++--
 4 files changed, 83 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 4604e9c..dceca6f 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -66,7 +66,7 @@ with the following path and query parameters:
 | **topic** | *Required* Name of the topic to use. On the consumer you can use 
comma to separate multiple topics. A producer can only send a message to a 
single topic. |  | String
 |=======================================================================
 
-#### Query Parameters (82 parameters):
+#### Query Parameters (83 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -100,6 +100,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer 
will ignore the KafkaConstants.TOPIC header setting of the inbound message. | 
false | boolean
 | **bufferMemorySize** (producer) | The total bytes of memory the producer can 
use to buffer records waiting to be sent to the server. If records are sent 
faster than they can be delivered to the server the producer will either block 
or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use but is not a hard bound since not all memory the producer 
uses is used for buffering. Some additional memory will be used for compression 
(if compression is enabled) as well as for maintaining in-flight requests. | 
33554432 | Integer
+| **circularKeyDetection** (producer) | If the option is true then 
KafkaProducer will detect if the message is attempted to be sent back to the 
same topic it may come from if the message was original from a kafka consumer. 
If the KafkaConstants.TOPIC header is the same as the original kafka consumer 
topic then the header setting is ignored and the topic of the producer endpoint 
is used. In other words this avoids sending the same message back to where it 
came from. This option is not in use if the option bridgeEndpoint is set to 
true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the 
compression codec for all data generated by this producer. Valid values are 
none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number 
of milliseconds specified by this config. | 540000 | Integer
 | **key** (producer) | The record key (or null if no key is specified). If 
this option has been configured then it take precedence over header link 
KafkaConstantsKEY |  | String

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 46bf844..b4c86ef 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -53,6 +52,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements 
MultipleConsumersS
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @UriParam(label = "producer")
     private boolean bridgeEndpoint;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean circularTopicDetection = true;
 
     public KafkaEndpoint() {
     }
@@ -196,4 +197,19 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
     public void setBridgeEndpoint(boolean bridgeEndpoint) {
         this.bridgeEndpoint = bridgeEndpoint;
     }
+
+    public boolean isCircularTopicDetection() {
+        return circularTopicDetection;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will detect if the message is 
attempted to be sent back to the same topic
+     * it may come from, if the message was original from a kafka consumer. If 
the KafkaConstants.TOPIC header is the
+     * same as the original kafka consumer topic, then the header setting is 
ignored, and the topic of the producer
+     * endpoint is used. In other words this avoids sending the same message 
back to where it came from.
+     * This option is not in use if the option bridgeEndpoint is set to true.
+     */
+    public void setCircularTopicDetection(boolean circularTopicDetection) {
+        this.circularTopicDetection = circularTopicDetection;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 65fd9d2..01d29b5 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.kafka.clients.producer.Callback;
@@ -120,9 +121,30 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("unchecked")
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) 
throws CamelException {
         String topic = endpoint.getConfiguration().getTopic();
+
         if (!endpoint.isBridgeEndpoint()) {
-            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, 
String.class);
+            String headerTopic = 
exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
+            boolean allowHeader = true;
+
+            // when we do not bridge then detect if we try to send back to 
ourselves
+            // which we most likely do not want to do
+            if (headerTopic != null && endpoint.isCircularTopicDetection()) {
+                Endpoint from = exchange.getFromEndpoint();
+                if (from instanceof KafkaEndpoint) {
+                    String fromTopic = ((KafkaEndpoint) 
from).getConfiguration().getTopic();
+                    allowHeader = !headerTopic.equals(fromTopic);
+                    if (!allowHeader) {
+                        log.debug("Circular topic detected from message 
header."
+                            + " Cannot send to same topic as the message comes 
from: {}"
+                            + ". Will use endpoint configured topic: {}", 
from, topic);
+                    }
+                }
+            }
+            if (allowHeader && headerTopic != null) {
+                topic = headerTopic;
+            }
         }
+
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d30e737..9143017 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -47,6 +47,7 @@ public class KafkaProducerTest {
 
     private KafkaProducer producer;
     private KafkaEndpoint endpoint;
+    private KafkaEndpoint fromEndpoint;
 
     private TypeConverter converter = Mockito.mock(TypeConverter.class);
     private CamelContext context = Mockito.mock(CamelContext.class);
@@ -63,6 +64,8 @@ public class KafkaProducerTest {
         endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new 
HashMap());
         producer = new KafkaProducer(endpoint);
 
+        fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", 
new HashMap());
+
         RecordMetadata rm = new RecordMetadata(null, 1, 1);
         Future future = Mockito.mock(Future.class);
         Mockito.when(future.get()).thenReturn(rm);
@@ -204,7 +207,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
+    public void processSendsMessageWithPartitionKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
@@ -218,7 +221,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendsMesssageWithMessageKeyHeader() throws Exception {
+    public void processSendsMessageWithMessageKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
@@ -246,8 +249,43 @@ public class KafkaProducerTest {
         assertRecordMetadataExists();
     }
 
+    @Test
+    public void processSendMessageWithCircularDetected() throws Exception {
+        endpoint.getConfiguration().setTopic("sometopic");
+        endpoint.setCircularTopicDetection(true); // enable by default
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
+        // this is the from topic that are from the fromEndpoint
+        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
+        in.setHeader(KafkaConstants.KEY, "somekey");
+
+        producer.process(exchange);
+
+        verifySendMessage("sometopic", "somekey");
+        assertRecordMetadataExists();
+    }
+
+    @Test
+    public void processSendMessageWithNoCircularDetected() throws Exception {
+        endpoint.getConfiguration().setTopic("sometopic");
+        endpoint.setCircularTopicDetection(false); // enable by default
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+        Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
+        // this is the from topic that are from the fromEndpoint
+        in.setHeader(KafkaConstants.TOPIC, "fromtopic");
+        in.setHeader(KafkaConstants.KEY, "somekey");
+
+        producer.process(exchange);
+
+        // will end up sending back to itself at fromtopic
+        verifySendMessage("fromtopic", "somekey");
+        assertRecordMetadataExists();
+    }
+
     @Test // Message and Topic Name alone
-    public void processSendsMesssageWithMessageTopicName() throws Exception {
+    public void processSendsMessageWithMessageTopicName() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);

Reply via email to