This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit d26d549572e2c626ada09eaef2123de8559e8d59 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Feb 27 10:05:31 2025 +0100 CAMEL-21807: camel-kamelet - Custom kamelet with dynamic query parameter with sql component --- .../apache/camel/component/sql/SqlComponent.java | 11 ++- .../camel/component/sql/SqlDynamicKameletTest.java | 87 ++++++++++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 19 ++++- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java index 8a70d4c5a1e..7f92e5092b3 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java @@ -26,6 +26,7 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.HealthCheckComponent; import org.apache.camel.support.PropertyBindingSupport; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.PropertiesHelper; import org.springframework.jdbc.core.JdbcTemplate; @@ -65,13 +66,17 @@ public class SqlComponent extends HealthCheckComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + String query = getAndRemoveParameter(parameters, "query", String.class); + if (query == null) { + query = remaining; + } + if (ObjectHelper.isEmpty(query)) { + throw new IllegalArgumentException("Query parameter is required"); + } String parameterPlaceholderSubstitute = getAndRemoveParameter(parameters, "placeholder", String.class, "#"); - - String query = remaining; if (usePlaceholder) { query = query.replaceAll(parameterPlaceholderSubstitute, "?"); } - String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class); if (onConsume == null) { onConsume = getAndRemoveParameter(parameters, "onConsume", String.class); diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java new file mode 100644 index 00000000000..20995c8d2ad --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import java.util.List; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SqlDynamicKameletTest extends CamelTestSupport { + + EmbeddedDatabase db; + + @Override + + public void doPreSetup() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setName(getClass().getSimpleName()) + .setType(EmbeddedDatabaseType.H2) + .addScript("sql/createAndPopulateDatabase.sql").build(); + } + + @Override + public void doPostTearDown() throws Exception { + if (db != null) { + db.shutdown(); + } + } + + @Test + public void testSimulateDynamicKamelet() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:query"); + mock.expectedMessageCount(1); + + template.requestBodyAndHeader("direct:query", "ASF", "names", "Camel,AMQ"); + + MockEndpoint.assertIsSatisfied(context); + + List list = mock.getReceivedExchanges().get(0).getIn().getBody(List.class); + assertEquals(2, list.size()); + Map row = (Map) list.get(0); + assertEquals("Camel", row.get("PROJECT")); + row = (Map) list.get(1); + assertEquals("AMQ", row.get("PROJECT")); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // required for the sql component + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + context.getPropertiesComponent().addInitialProperty("localSql", "sql"); + context.getPropertiesComponent().addInitialProperty("myQuery", "classpath:sql/selectProjectsAndIn.sql"); + + from("direct:query") + .to("{{localSql}}?query={{myQuery}}") + .to("log:query") + .to("mock:query"); + } + }; + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 00e96838257..7cf01d25eb8 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -786,16 +786,29 @@ public abstract class AbstractCamelContext extends BaseService } if (answer == null) { try { - scheme = StringHelper.before(uri, ":"); + // the uri may not contain a scheme such as a dynamic kamelet + // so we need to find the component name via the first text before : or ? mark + int pos1 = uri.indexOf(':'); + int pos2 = uri.indexOf('?'); + if (pos1 != -1 && pos2 != -1) { + scheme = uri.substring(0, Math.min(pos1, pos2)); + } else if (pos1 != -1) { + scheme = uri.substring(0, pos1); + } else if (pos2 != -1) { + scheme = uri.substring(0, pos2); + } else { + scheme = null; + } if (scheme == null) { // it may refer to a logical endpoint answer = camelContextExtension.getRegistry().lookupByNameAndType(uri, Endpoint.class); if (answer != null) { return answer; - } else { - throw new NoSuchEndpointException(uri); } } + if (scheme == null) { + scheme = uri; + } LOG.trace("Endpoint uri: {} is from component with name: {}", uri, scheme); Component component = getComponent(scheme); ServiceHelper.initService(component);
