This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 16fc72c  Switched to use RuntimeCatalog.asEndpointUri method to 
construct the uri to better suport endpoints paths fix #143 fix #144
     new def9ca0  Merge pull request #192 from valdar/pathOptionsFix
16fc72c is described below

commit 16fc72ceb0278207b839c6125959e0657e7f16fc
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Tue May 5 21:38:47 2020 +0200

    Switched to use RuntimeCatalog.asEndpointUri method to construct the uri to 
better suport endpoints paths fix #143 fix #144
---
 core/pom.xml                                       |  9 +++++++
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 12 ++++++++--
 .../camel/kafkaconnector/CamelSourceTask.java      | 12 ++++++++--
 .../kafkaconnector/utils/CamelMainSupport.java     |  6 +++++
 .../camel/kafkaconnector/utils/TaskHelper.java     | 11 +++++++++
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  4 ++--
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  4 ++--
 .../camel/kafkaconnector/utils/TaskHelperTest.java | 28 ++++++++++++++++++++++
 8 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 125cdd6..548a04a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-catalog</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-main</artifactId>
         </dependency>
         <dependency>
@@ -114,6 +118,11 @@
             <artifactId>camel-debezium-common</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cassandraql</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index baa6665..f9252d0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,8 +23,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -67,11 +70,16 @@ public class CamelSinkTask extends SinkTask {
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
 
+            CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(actualProps, 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF), 
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX);
+                remoteUrl = 
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                                                actualProps,
+                                                
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
+                                                
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
+                                                
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null);
+            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null, camelContext);
 
             producer = cms.createProducerTemplate();
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 83ea1b1..ceecf94 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,9 +27,12 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
@@ -81,11 +84,16 @@ public class CamelSourceTask extends SourceTask {
 
             String localUrl = getLocalUrlWithPollingOptions(config);
 
+            CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(actualProps, 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), 
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+                remoteUrl = 
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                                                actualProps,
+                                                
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+                                                
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+                                                
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller);
+            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, camelContext);
 
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 91735f2..d84a41e 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -28,8 +28,10 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.Main;
@@ -148,6 +150,10 @@ public class CamelMainSupport {
         return camel.createConsumerTemplate();
     }
 
+    public RuntimeCamelCatalog getRuntimeCamelCatalog() {
+        return 
camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+    }
+
     private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
         DataFormat df = camel.resolveDataFormat(dataformatName);
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 6259985..c623662 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -16,16 +16,27 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+
 public final class TaskHelper {
 
     private TaskHelper() {
     }
 
+    public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> 
props, String componentSchema, String endpointPropertiesPrefix, String 
pathPropertiesPrefix) throws URISyntaxException {
+        Map<String, String> filteredProps = new HashMap<>();
+        props.keySet().stream()
+                .filter(k -> k.startsWith(endpointPropertiesPrefix) || 
k.startsWith(pathPropertiesPrefix))
+                .forEach(k -> 
filteredProps.put(k.replace(endpointPropertiesPrefix, 
"").replace(pathPropertiesPrefix, ""), props.get(k)));
+        return rcc.asEndpointUri(componentSchema, filteredProps, false);
+    }
+
     public static String buildUrl(Map<String, String> props, String 
componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) {
         final String urlPath = createUrlPathFromProperties(props, 
pathPropertiesPrefix);
         final String endpointOptions = 
createEndpointOptionsFromProperties(props, endpointPropertiesPrefix);
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3003afa..32e3bc6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -489,7 +489,7 @@ public class CamelSinkTaskTest {
         props.put("topics", "mytopic");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + 
"bridgeErrorHandler", "true");
-        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", 
"test");
+        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", 
"test");
 
         CamelSinkTask camelSinkTask = new CamelSinkTask();
         camelSinkTask.start(props);
@@ -518,7 +518,7 @@ public class CamelSinkTaskTest {
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + 
"bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", 
"50");
-        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", 
"test");
+        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", 
"test");
 
         CamelSinkTask camelSinkTask = new CamelSinkTask();
         camelSinkTask.start(props);
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index a4ad86c..830a013 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -273,7 +273,7 @@ public class CamelSourceTaskTest {
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "1000");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"pathChunk", "kafkaconnector");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "kafkaconnector");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
@@ -306,7 +306,7 @@ public class CamelSourceTaskTest {
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "1000");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"repeatCount", "0");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"pathChunk", "kafkaconnector");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "kafkaconnector");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 2f32329..ea6938d 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,4 +179,28 @@ public class TaskHelperTest {
 
         assertEquals("test:value2?key1=value1", result);
     }
+
+    @Test
+    public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
+        DefaultCamelContext dcc = new DefaultCamelContext();
+        RuntimeCamelCatalog rcc = 
dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+        Map<String, String> props = new HashMap<String, String>() {{
+                put("prefix.name", "test");
+                put("anotherPrefix.synchronous", "true");
+            }};
+
+        String result = TaskHelper.buildUrl(rcc, props, "direct", "prefix.", 
"anotherPrefix.");
+
+        assertEquals("direct:test?synchronous=true", result);
+
+        props = new HashMap<String, String>() {{
+                put("prefix.port", "8080");
+                put("anotherPrefix.keyspace", "test");
+                put("anotherPrefix.hosts", "localhost");
+            }};
+
+        result = TaskHelper.buildUrl(rcc, props, "cql", "prefix.", 
"anotherPrefix.");
+
+        assertEquals("cql:localhost:8080/test", result);
+    }
 }
\ No newline at end of file

Reply via email to