This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 16fc72c Switched to use RuntimeCatalog.asEndpointUri method to
construct the uri to better suport endpoints paths fix #143 fix #144
new def9ca0 Merge pull request #192 from valdar/pathOptionsFix
16fc72c is described below
commit 16fc72ceb0278207b839c6125959e0657e7f16fc
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Tue May 5 21:38:47 2020 +0200
Switched to use RuntimeCatalog.asEndpointUri method to construct the uri to
better suport endpoints paths fix #143 fix #144
---
core/pom.xml | 9 +++++++
.../apache/camel/kafkaconnector/CamelSinkTask.java | 12 ++++++++--
.../camel/kafkaconnector/CamelSourceTask.java | 12 ++++++++--
.../kafkaconnector/utils/CamelMainSupport.java | 6 +++++
.../camel/kafkaconnector/utils/TaskHelper.java | 11 +++++++++
.../camel/kafkaconnector/CamelSinkTaskTest.java | 4 ++--
.../camel/kafkaconnector/CamelSourceTaskTest.java | 4 ++--
.../camel/kafkaconnector/utils/TaskHelperTest.java | 28 ++++++++++++++++++++++
8 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 125cdd6..548a04a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-catalog</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-main</artifactId>
</dependency>
<dependency>
@@ -114,6 +118,11 @@
<artifactId>camel-debezium-common</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cassandraql</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index baa6665..f9252d0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,8 +23,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
import org.apache.camel.support.DefaultExchange;
@@ -67,11 +70,16 @@ public class CamelSinkTask extends SinkTask {
String remoteUrl =
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller =
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
+ CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(actualProps,
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX);
+ remoteUrl =
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+ actualProps,
+
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
+
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
+
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
}
- cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl,
marshaller, null);
+ cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl,
marshaller, null, camelContext);
producer = cms.createProducerTemplate();
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 83ea1b1..ceecf94 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,9 +27,12 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
@@ -81,11 +84,16 @@ public class CamelSourceTask extends SourceTask {
String localUrl = getLocalUrlWithPollingOptions(config);
+ CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(actualProps,
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+ remoteUrl =
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+ actualProps,
+
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
- cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null,
unmarshaller);
+ cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null,
unmarshaller, camelContext);
Endpoint endpoint = cms.getEndpoint(localUrl);
consumer = endpoint.createPollingConsumer();
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 91735f2..d84a41e 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -28,8 +28,10 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
@@ -148,6 +150,10 @@ public class CamelMainSupport {
return camel.createConsumerTemplate();
}
+ public RuntimeCamelCatalog getRuntimeCamelCatalog() {
+ return
camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+ }
+
private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
DataFormat df = camel.resolveDataFormat(dataformatName);
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 6259985..c623662 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -16,16 +16,27 @@
*/
package org.apache.camel.kafkaconnector.utils;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+
public final class TaskHelper {
private TaskHelper() {
}
+ public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String>
props, String componentSchema, String endpointPropertiesPrefix, String
pathPropertiesPrefix) throws URISyntaxException {
+ Map<String, String> filteredProps = new HashMap<>();
+ props.keySet().stream()
+ .filter(k -> k.startsWith(endpointPropertiesPrefix) ||
k.startsWith(pathPropertiesPrefix))
+ .forEach(k ->
filteredProps.put(k.replace(endpointPropertiesPrefix,
"").replace(pathPropertiesPrefix, ""), props.get(k)));
+ return rcc.asEndpointUri(componentSchema, filteredProps, false);
+ }
+
public static String buildUrl(Map<String, String> props, String
componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) {
final String urlPath = createUrlPathFromProperties(props,
pathPropertiesPrefix);
final String endpointOptions =
createEndpointOptionsFromProperties(props, endpointPropertiesPrefix);
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3003afa..32e3bc6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -489,7 +489,7 @@ public class CamelSinkTaskTest {
props.put("topics", "mytopic");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() +
"bridgeErrorHandler", "true");
- props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk",
"test");
+ props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name",
"test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
@@ -518,7 +518,7 @@ public class CamelSinkTaskTest {
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() +
"bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size",
"50");
- props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk",
"test");
+ props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name",
"test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index a4ad86c..830a013 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -273,7 +273,7 @@ public class CamelSourceTaskTest {
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "1000");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"pathChunk", "kafkaconnector");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "kafkaconnector");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
@@ -306,7 +306,7 @@ public class CamelSourceTaskTest {
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "1000");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"repeatCount", "0");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"pathChunk", "kafkaconnector");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "kafkaconnector");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 2f32329..ea6938d 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -16,10 +16,14 @@
*/
package org.apache.camel.kafkaconnector.utils;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,4 +179,28 @@ public class TaskHelperTest {
assertEquals("test:value2?key1=value1", result);
}
+
+ @Test
+ public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
+ DefaultCamelContext dcc = new DefaultCamelContext();
+ RuntimeCamelCatalog rcc =
dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+ Map<String, String> props = new HashMap<String, String>() {{
+ put("prefix.name", "test");
+ put("anotherPrefix.synchronous", "true");
+ }};
+
+ String result = TaskHelper.buildUrl(rcc, props, "direct", "prefix.",
"anotherPrefix.");
+
+ assertEquals("direct:test?synchronous=true", result);
+
+ props = new HashMap<String, String>() {{
+ put("prefix.port", "8080");
+ put("anotherPrefix.keyspace", "test");
+ put("anotherPrefix.hosts", "localhost");
+ }};
+
+ result = TaskHelper.buildUrl(rcc, props, "cql", "prefix.",
"anotherPrefix.");
+
+ assertEquals("cql:localhost:8080/test", result);
+ }
}
\ No newline at end of file