This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new f0f0f1bd742 [CAMEL-18159] SendDynamicAware of several components
parses uri that starts with schema:// incorectly (#7713)
f0f0f1bd742 is described below
commit f0f0f1bd7428caa6f9cc2e6a8cb66ca6308ea9ce
Author: artemse <[email protected]>
AuthorDate: Sat Jun 4 10:18:55 2022 +0300
[CAMEL-18159] SendDynamicAware of several components parses uri that starts
with schema:// incorectly (#7713)
* [CAMEL-18459] Fixes incorrect JmsSendDynamicAware parsing for
destinations that starts with "schema://" and dose not have queue: or topic:
prefix
* [CAMEL-18459] redundant empty line removed
* [CAMEL-18459] cleanup JmsToDSendDynamicTest, assertion on message body
added
* [CAMEL-18459] RabbitMQSendDynamicAware incorrectly parses exchange name
if it starts with "schema://:
* [CAMEL-18459] SpringRabbitMQSendDynamicAware incorrectly parses exchange
name if it starts with "schema://:
* [CAMEL-18459] incorrect schema in SpringRabbitMQSendDynamicAwareTest fixed
* SjmsSendDynamicAware incorrectly parses destination if it starts with
schema://
* [CAMEL-18459] VertxKafkaSendDynamicAware incorrectly parses topic name if
it starts with "schema://:
* [CAMEL-18459] PahoSendDynamicAware incorrectly parses topic name if it
starts with "schema://:
* [CAMEL-18459] remove file added by mistake
* [CAMEL-18159] missing license info added
* [CAMEL-18159] PahoMqttSendDynamicAware incorrectly parses topic name if
it starts with "schema://:
---
.../camel/component/jms/JmsToDSendDynamicTest.java | 6 +--
.../paho/mqtt5/PahoMqtt5SendDynamicAware.java | 1 +
.../paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java | 56 +++++++++++++++++++++
.../camel/component/paho/PahoSendDynamicAware.java | 1 +
.../component/paho/PahoToDSendDynamicTest.java | 7 +++
.../rabbitmq/RabbitMQSendDynamicAware.java | 1 +
.../rabbitmq/RabbitMQSendDynamicAwareTest.java | 57 ++++++++++++++++++++++
.../camel/component/sjms/SjmsSendDynamicAware.java | 1 +
.../component/sjms/SjmsSendDynamicAwareTest.java | 57 ++++++++++++++++++++++
.../SpringRabbitMQSendDynamicAware.java | 1 +
.../SpringRabbitMQSendDynamicAwareTest.java | 57 ++++++++++++++++++++++
.../vertx/kafka/VertxKafkaSendDynamicAware.java | 1 +
.../kafka/VertxKafkaSendDynamicAwareTest.java | 57 ++++++++++++++++++++++
13 files changed, 299 insertions(+), 4 deletions(-)
diff --git
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
index cd227b1b471..4778953932d 100644
---
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
+++
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsToDSendDynamicTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.jms;
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
@@ -47,9 +46,8 @@ public class JmsToDSendDynamicTest extends CamelTestSupport {
@Test
public void testToDSlashed() {
template.sendBodyAndHeader("direct:startSlashed", "Hello bar",
"where", "bar");
-
- Exchange exchange = consumer.receive("activemq://bar", 2000);
- exchange.getMessage().getHeader(JmsConstants.JMS_DESTINATION_NAME);
+ String out = consumer.receiveBody("activemq://bar", 2000,
String.class);
+ assertEquals("Hello bar", out);
}
@Override
diff --git
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
index caae4b75aa1..14dab516ad9 100644
---
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
+++
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAware.java
@@ -104,6 +104,7 @@ public class PahoMqtt5SendDynamicAware extends
ServiceSupport implements SendDyn
private String parseTopicName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java
new file mode 100644
index 00000000000..63b0806d4fe
--- /dev/null
+++
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5SendDynamicAwareTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho.mqtt5;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PahoMqtt5SendDynamicAwareTest extends CamelTestSupport {
+ PahoMqtt5SendDynamicAware pahoMqtt5SendDynamicAware;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ this.pahoMqtt5SendDynamicAware = new PahoMqtt5SendDynamicAware();
+ }
+
+ @Test
+ public void testUriParsing() throws Exception {
+ this.pahoMqtt5SendDynamicAware.setScheme("paho-mqtt5");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("paho-mqtt5:destination",
"paho-mqtt5:${header.test}", null, null);
+ Processor processor =
this.pahoMqtt5SendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(PahoMqtt5Constants.CAMEL_PAHO_OVERRIDE_TOPIC));
+ }
+
+ @Test
+ public void testSlashedUriParsing() throws Exception {
+ this.pahoMqtt5SendDynamicAware.setScheme("paho-mqtt5");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("paho-mqtt5://destination",
"paho-mqtt5://${header.test}", null, null);
+ Processor processor =
this.pahoMqtt5SendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(PahoMqtt5Constants.CAMEL_PAHO_OVERRIDE_TOPIC));
+ }
+}
\ No newline at end of file
diff --git
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
index 31555c40870..a55500ed784 100644
---
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
+++
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoSendDynamicAware.java
@@ -104,6 +104,7 @@ public class PahoSendDynamicAware extends ServiceSupport
implements SendDynamicA
private String parseTopicName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
index 34ac31fd69d..41cbba0ac9e 100644
---
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
+++
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
@@ -57,6 +57,12 @@ public class PahoToDSendDynamicTest extends CamelTestSupport
{
out = consumer.receiveBody("paho:beer", 2000, String.class);
assertEquals("Hello beer", out);
}
+ @Test
+ public void testToDSlashed() {
+ template.sendBodyAndHeader("direct:startSlashed", "Hello bar",
"where", "bar");
+ String out = consumer.receiveBody("paho://bar", 2000, String.class);
+ assertEquals("Hello bar", out);
+ }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -68,6 +74,7 @@ public class PahoToDSendDynamicTest extends CamelTestSupport {
// route message dynamic using toD
from("direct:start").toD("paho:${header.where}?retained=true");
+
from("direct:startSlashed").toD("paho://${header.where}?retained=true");
}
};
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
index 93cdc63846d..35ac6758221 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAware.java
@@ -104,6 +104,7 @@ public class RabbitMQSendDynamicAware extends
ServiceSupport implements SendDyna
private String parseExchangeName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java
new file mode 100644
index 00000000000..670d268e849
--- /dev/null
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RabbitMQSendDynamicAwareTest extends CamelTestSupport {
+
+ RabbitMQSendDynamicAware rabbitMQSendDynamicAware;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ this.rabbitMQSendDynamicAware = new RabbitMQSendDynamicAware();
+ }
+
+ @Test
+ public void testUriParsing() throws Exception {
+ this.rabbitMQSendDynamicAware.setScheme("rabbitmq");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("rabbitmq:destination",
"rabbitmq:${header.test}", null, null);
+ Processor processor =
this.rabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+ }
+
+ @Test
+ public void testSlashedUriParsing() throws Exception {
+ this.rabbitMQSendDynamicAware.setScheme("rabbitmq");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("rabbitmq://destination",
"rabbitmq://${header.test}", null, null);
+ Processor processor =
this.rabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+ }
+}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
index fd3697c72fa..81d99f5dc85 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
@@ -104,6 +104,7 @@ public class SjmsSendDynamicAware extends ServiceSupport
implements SendDynamicA
private String parseDestinationName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java
new file mode 100644
index 00000000000..a68489c8924
--- /dev/null
+++
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SjmsSendDynamicAwareTest extends CamelTestSupport {
+
+ SjmsSendDynamicAware sjmsSendDynamicAware;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ this.sjmsSendDynamicAware = new SjmsSendDynamicAware();
+ }
+
+ @Test
+ public void testUriParsing() throws Exception {
+ this.sjmsSendDynamicAware.setScheme("sjms");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("sjms:destination", "sjms:${header.test}",
null, null);
+ Processor processor =
this.sjmsSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME));
+ }
+
+ @Test
+ public void testSlashedUriParsing() throws Exception {
+ this.sjmsSendDynamicAware.setScheme("sjms");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("sjms://destination",
"sjms://${header.test}", null, null);
+ Processor processor =
this.sjmsSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME));
+ }
+}
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
index 9bc330a2c88..0b2eb662086 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAware.java
@@ -127,6 +127,7 @@ public class SpringRabbitMQSendDynamicAware extends
ServiceSupport implements Se
private String parseExchangeName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java
new file mode 100644
index 00000000000..1957ae60c78
--- /dev/null
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/SpringRabbitMQSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.springrabbit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SpringRabbitMQSendDynamicAwareTest extends CamelTestSupport {
+
+ SpringRabbitMQSendDynamicAware springRabbitMQSendDynamicAware;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ this.springRabbitMQSendDynamicAware = new
SpringRabbitMQSendDynamicAware();
+ }
+
+ @Test
+ public void testUriParsing() throws Exception {
+ this.springRabbitMQSendDynamicAware.setScheme("spring-rabbitmq");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("spring-rabbitmq:destination",
"spring-rabbitmq:${header.test}", null, null);
+ Processor processor =
this.springRabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(SpringRabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+ }
+
+ @Test
+ public void testSlashedUriParsing() throws Exception {
+ this.springRabbitMQSendDynamicAware.setScheme("spring-rabbitmq");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("spring-rabbitmq://destination",
"spring-rabbitmq://${header.test}", null, null);
+ Processor processor =
this.springRabbitMQSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(SpringRabbitMQConstants.EXCHANGE_OVERRIDE_NAME));
+ }
+}
diff --git
a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
index db901d6b120..d0e5323c83c 100644
---
a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
+++
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAware.java
@@ -104,6 +104,7 @@ public class VertxKafkaSendDynamicAware extends
ServiceSupport implements SendDy
private String parseTopicName(String uri) {
// strip query
+ uri = uri.replaceFirst(scheme + "://", ":");
int pos = uri.indexOf('?');
if (pos != -1) {
uri = uri.substring(0, pos);
diff --git
a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java
new file mode 100644
index 00000000000..fe8c6f417f8
--- /dev/null
+++
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaSendDynamicAwareTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class VertxKafkaSendDynamicAwareTest extends CamelTestSupport {
+
+ VertxKafkaSendDynamicAware vertxKafkaSendDynamicAware;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ this.vertxKafkaSendDynamicAware = new VertxKafkaSendDynamicAware();
+ }
+
+ @Test
+ public void testUriParsing() throws Exception {
+ this.vertxKafkaSendDynamicAware.setScheme("vertx-kafka");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("vertx-kafka:destination",
"vertx-kafka:${header.test}", null, null);
+ Processor processor =
this.vertxKafkaSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+ }
+
+ @Test
+ public void testSlashedUriParsing() throws Exception {
+ this.vertxKafkaSendDynamicAware.setScheme("vertx-kafka");
+ Exchange exchange = createExchangeWithBody("The Body");
+ SendDynamicAware.DynamicAwareEntry entry = new
SendDynamicAware.DynamicAwareEntry("vertx-kafka://destination",
"vertx-kafka://${header.test}", null, null);
+ Processor processor =
this.vertxKafkaSendDynamicAware.createPreProcessor(createExchangeWithBody("Body"),
entry);
+ processor.process(exchange);
+ assertEquals("destination",
exchange.getMessage().getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+ }
+}