Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x d155bc500 -> 4881fbd0f


CAMEL-9511 - Setting Kafka's endpoint configuration by reference does not merge 
params


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4881fbd0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4881fbd0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4881fbd0

Branch: refs/heads/camel-2.16.x
Commit: 4881fbd0ffd8bf0f33d753f7d0eca1ce9e3f70e7
Parents: d155bc5
Author: Akitoshi Yoshida <a...@apache.org>
Authored: Tue Feb 2 14:14:14 2016 +0100
Committer: Akitoshi Yoshida <a...@apache.org>
Committed: Wed Feb 3 10:46:28 2016 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaComponent.java   | 16 +++++
 .../component/kafka/KafkaConfiguration.java     | 15 ++++-
 .../component/kafka/KafkaComponentTest.java     | 62 ++++++++++++++++++++
 3 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4881fbd0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index b659e73..c9d4c2a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -20,6 +20,8 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.EndpointHelper;
 
 public class KafkaComponent extends UriEndpointComponent {
 
@@ -38,6 +40,20 @@ public class KafkaComponent extends UriEndpointComponent {
 
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
         String brokers = remaining.split("\\?")[0];
+        Object confparam = params.get("configuration");
+        if (confparam != null) {
+            // need a special handling to resolve the reference before other 
parameters are set/merged into the config
+            KafkaConfiguration confobj = null;
+            if (confparam instanceof KafkaConfiguration) {
+                confobj = (KafkaConfiguration)confparam;
+            } else if (confparam instanceof String && 
EndpointHelper.isReferenceParameter((String)confparam)) { 
+                confobj = 
(KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), 
((String)confparam).substring(1));
+            }
+            if (confobj != null) {
+                endpoint.setConfiguration(confobj.copy());
+            }
+            params.remove("configuration");
+        }
         if (brokers != null) {
             endpoint.getConfiguration().setBrokers(brokers);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/4881fbd0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b03970c..894df0c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -19,13 +19,14 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 
 import kafka.producer.DefaultPartitioner;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 
 @UriParams
-public class KafkaConfiguration {
+public class KafkaConfiguration implements Cloneable {
 
     @UriParam
     private String zookeeperConnect;
@@ -774,4 +775,16 @@ public class KafkaConfiguration {
     public void setDualCommitEnabled(Boolean dualCommitEnabled) {
         this.dualCommitEnabled = dualCommitEnabled;
     }
+
+    /**
+     * Returns a copy of this configuration
+     */
+    public KafkaConfiguration copy() {
+        try {
+            return (KafkaConfiguration)clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4881fbd0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 15cef7a..eb6dd09 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Registry;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -76,4 +77,65 @@ public class KafkaComponentTest {
         assertEquals(3, endpoint.getConsumerStreams());
         assertEquals("com.class.Party", endpoint.getPartitioner());
     }
+
+    @Test
+    public void testPropertiesConfigrationMerge() throws Exception {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("portNumber", 14123);
+        params.put("consumerStreams", "3");
+        params.put("topic", "mytopic");
+        params.put("partitioner", "com.class.Party");
+
+        KafkaConfiguration kc = new KafkaConfiguration();
+        kc.setZookeeperHost("somehost");
+        kc.setZookeeperPort(2987);
+        kc.setTopic("default");
+        params.put("configuration", kc);
+
+        String uri = "kafka:broker1:12345,broker2:12566";
+        String remaining = "broker1:12345,broker2:12566";
+
+        KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
+        assertEquals("somehost", endpoint.getZookeeperHost());
+        assertEquals(2987, endpoint.getZookeeperPort());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
+        assertEquals("mytopic", endpoint.getTopic());
+        assertEquals(3, endpoint.getConsumerStreams());
+        assertEquals("com.class.Party", endpoint.getPartitioner());
+        assertNull("dirty", kc.getBrokers());
+        assertEquals("default", kc.getTopic());
+    }
+
+    @Test
+    public void testPropertiesConfigrationRefMerge() throws Exception {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("portNumber", 14123);
+        params.put("consumerStreams", "3");
+        params.put("topic", "mytopic");
+        params.put("partitioner", "com.class.Party");
+
+        KafkaConfiguration kc = new KafkaConfiguration();
+        kc.setZookeeperHost("somehost");
+        kc.setZookeeperPort(2987);
+        kc.setTopic("default");
+        Registry registry = Mockito.mock(Registry.class);
+        Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc);
+        Mockito.when(context.getRegistry()).thenReturn(registry);
+        params.put("configuration", "#baseconf");
+
+        String uri = "kafka:broker1:12345,broker2:12566";
+        String remaining = "broker1:12345,broker2:12566";
+
+        KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
+        assertEquals("somehost", endpoint.getZookeeperHost());
+        assertEquals(2987, endpoint.getZookeeperPort());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
+        assertEquals("mytopic", endpoint.getTopic());
+        assertEquals(3, endpoint.getConsumerStreams());
+        assertEquals("com.class.Party", endpoint.getPartitioner());
+        assertNull("dirty", kc.getBrokers());
+        assertEquals("default", kc.getTopic());
+    }
 }

Reply via email to