This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1d2568f00236dcd45b4e022912646155a0f97ff5 Author: Pasquale Congiusti <[email protected]> AuthorDate: Thu Dec 4 11:43:03 2025 +0100 feat(components): sql query enhancement --- .../stored/CallableStatementWrapperFactory.java | 2 -- .../jdbc/ClusteredJdbcAggregationRepository.java | 23 ++++++++++++---- .../aggregate/jdbc/JdbcAggregationRepository.java | 31 +++++++++++++++------- .../JdbcOrphanLockAwareIdempotentRepository.java | 6 ++--- .../component/sql/ProducerBatchSimpleExpTest.java | 2 +- .../sql/SqlConsumerDynamicParameterTest.java | 2 +- .../sql/SqlConsumerOutputTypeStreamListTest.java | 6 ++--- .../component/sql/stored/TemplateCacheTest.java | 22 ++++++++------- 8 files changed, 60 insertions(+), 34 deletions(-) diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java index 2f67cd4b3c77..e555d19c87d8 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java @@ -36,9 +36,7 @@ public class CallableStatementWrapperFactory extends ServiceSupport { final TemplateParser templateParser; boolean function; - @SuppressWarnings("unchecked") private final Map<String, TemplateStoredProcedure> templateCache = LRUCacheFactory.newLRUCache(TEMPLATE_CACHE_DEFAULT_SIZE); - @SuppressWarnings("unchecked") private final Map<String, BatchCallableStatementCreatorFactory> batchTemplateCache = LRUCacheFactory.newLRUCache(BATCH_TEMPLATE_CACHE_DEFAULT_SIZE); diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java index 5f0150e9af2b..dc2450cf37a5 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java @@ -29,6 +29,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ArgumentPreparedStatementSetter; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback; import org.springframework.jdbc.support.lob.LobCreator; @@ -74,8 +75,9 @@ public class ClusteredJdbcAggregationRepository extends JdbcAggregationRepositor final long version = exchange.getProperty(VERSION_PROPERTY, Long.class); try { LOG.debug("Removing key {}", correlationId); - - jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ? AND " + VERSION + " = ?", + String table = getRepositoryName(); + verifyTableName(table); + jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", correlationId, version); insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version, true); @@ -173,10 +175,21 @@ public class ClusteredJdbcAggregationRepository extends JdbcAggregationRepositor public Set<String> scan(final CamelContext camelContext) { return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() { public LinkedHashSet<String> doInTransaction(final TransactionStatus status) { + String table = getRepositoryNameCompleted(); + verifyTableName(table); + + String sql; + final String[] params; + if (isRecoveryByInstance()) { + sql = "SELECT " + ID + " FROM " + table + " WHERE INSTANCE_ID = ?"; + params = new String[] { instanceId }; + } else { + sql = "SELECT " + ID + " FROM " + table; + params = new String[] {}; + } + ArgumentPreparedStatementSetter apss = new ArgumentPreparedStatementSetter(params); final List<String> keys = jdbcTemplate.query( - "SELECT " + ID + " FROM " + getRepositoryNameCompleted() - + (isRecoveryByInstance() - ? " WHERE INSTANCE_ID='" + instanceId + "'" : ""), + sql, apss, new RowMapper<String>() { public String mapRow(final ResultSet rs, final int rowNum) throws SQLException { final String id = rs.getString(ID); diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java index fdf511f4b349..6dc89bcc0c4f 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java @@ -192,14 +192,15 @@ public class JdbcAggregationRepository extends ServiceSupport try { LOG.debug("Adding exchange with key {}", correlationId); - + String table = getRepositoryName(); + verifyTableName(table); boolean present = jdbcTemplate.queryForObject( - "SELECT COUNT(1) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", Integer.class, + "SELECT COUNT(1) FROM " + table + " WHERE " + ID + " = ?", Integer.class, correlationId) != 0; // Recover existing exchange with that ID if (isReturnOldExchange() && present) { - result = get(correlationId, getRepositoryName(), camelContext); + result = get(correlationId, table, camelContext); } if (present) { @@ -210,11 +211,11 @@ public class JdbcAggregationRepository extends ServiceSupport } else { long version = versionLong.longValue(); LOG.debug("Updating record with key {} and version {}", correlationId, version); - update(camelContext, correlationId, exchange, getRepositoryName(), version); + update(camelContext, correlationId, exchange, table, version); } } else { LOG.debug("Inserting record with key {}", correlationId); - insert(camelContext, correlationId, exchange, getRepositoryName(), 1L); + insert(camelContext, correlationId, exchange, table, 1L); } } catch (Exception e) { @@ -227,6 +228,13 @@ public class JdbcAggregationRepository extends ServiceSupport }); } + // Useful to verify if the table name does not contain invalid characters. + protected static void verifyTableName(String tableName) { + if (!tableName.matches("[a-zA-Z_][a-zA-Z0-9_]*")) { + throw new IllegalArgumentException("Invalid repository name: " + tableName); + } + } + /** * Updates the current exchange details in the given repository table. * @@ -376,7 +384,7 @@ public class JdbcAggregationRepository extends ServiceSupport return transactionTemplateReadOnly.execute(new TransactionCallback<Exchange>() { public Exchange doInTransaction(TransactionStatus status) { try { - + verifyTableName(repositoryName); Map<String, Object> columns = jdbcTemplate.queryForMap( String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID), new Object[] { key }, new int[] { Types.VARCHAR }); @@ -416,8 +424,9 @@ public class JdbcAggregationRepository extends ServiceSupport final long version = exchange.getProperty(VERSION_PROPERTY, Long.class); try { LOG.debug("Removing key {}", correlationId); - - jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ? AND " + VERSION + " = ?", + String table = getRepositoryName(); + verifyTableName(table); + jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", correlationId, version); insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version); @@ -440,8 +449,10 @@ public class JdbcAggregationRepository extends ServiceSupport return transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(TransactionStatus status) { LOG.debug("Confirming exchangeId {}", exchangeId); + String table = getRepositoryNameCompleted(); + verifyTableName(table); final int mustBeOne = jdbcTemplate - .update("DELETE FROM " + getRepositoryNameCompleted() + " WHERE " + ID + " = ?", exchangeId); + .update("DELETE FROM " + table + " WHERE " + ID + " = ?", exchangeId); if (mustBeOne != 1) { LOG.error("problem removing row {} from {} - DELETE statement did not return 1 but {}", exchangeId, getRepositoryNameCompleted(), mustBeOne); @@ -471,6 +482,7 @@ public class JdbcAggregationRepository extends ServiceSupport protected Set<String> getKeys(final String repositoryName) { return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() { public LinkedHashSet<String> doInTransaction(TransactionStatus status) { + verifyTableName(repositoryName); List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName, new RowMapper<String>() { public String mapRow(ResultSet rs, int rowNum) throws SQLException { @@ -698,6 +710,7 @@ public class JdbcAggregationRepository extends ServiceSupport } private int rowCount(final String repository) { + verifyTableName(repository); return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + repository, Integer.class); } diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java index 2934a71e153a..95992703ff4b 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import javax.sql.DataSource; import org.apache.camel.CamelContext; -import org.apache.camel.ShutdownableService; import org.apache.camel.spi.ExecutorServiceManager; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.support.TransactionTemplate; @@ -45,7 +44,7 @@ import org.springframework.transaction.support.TransactionTemplate; * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock has crashed. This is determined if the * timestamp on the createdAt column is more than the lockMaxAge. */ -public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdRepository implements ShutdownableService { +public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdRepository { private final StampedLock sl = new StampedLock(); @@ -94,9 +93,10 @@ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdReposi * If the update timestamp time is more than lockMaxAge then assume that the lock is orphan and the process * which had acquired the lock has died */ + // NOTE: the querystring is passed by the user, so, we are safe as he decide what query wants to perform. String orphanLockRecoverQueryString = getQueryString() + " AND createdAt >= ?"; Timestamp xMillisAgo = new Timestamp(System.currentTimeMillis() - lockMaxAgeMillis); - return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, Integer.class, processorName, key, + return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, Integer.class, processorName, key, // NOSONAR xMillisAgo); } diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java index 0735971fd3e8..b7a000797680 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java @@ -60,7 +60,7 @@ public class ProducerBatchSimpleExpTest extends CamelTestSupport { MockEndpoint mock = getMockEndpoint("mock:query"); mock.expectedMessageCount(1); - List data = new ArrayList(); + List<MyData> data = new ArrayList<>(); data.add(new MyData(4, "Donald", "DIS")); data.add(new MyData(5, "Goofy", "DIS")); template.requestBody("direct:query", data); diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java index 0582ff90b08f..e8bd040420c1 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java @@ -87,7 +87,7 @@ public class SqlConsumerDynamicParameterTest extends CamelTestSupport { getContext().getComponent("sql", SqlComponent.class).setDataSource(db); from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}?initialDelay=0&delay=50") - .routeId("foo").noAutoStartup() + .routeId("foo").autoStartup("false") .to("mock:result"); } }; diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java index c5329089b049..c65fdd194726 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java @@ -100,12 +100,12 @@ public class SqlConsumerOutputTypeStreamListTest extends CamelTestSupport { getContext().getComponent("sql", SqlComponent.class).setDataSource(db); from("sql:select * from projects order by id?outputType=StreamList&initialDelay=0&delay=50").routeId("route1") - .noAutoStartup() + .autoStartup(false) .to("log:stream") .to("mock:result"); from("sql:select * from projects order by id?outputType=StreamList&initialDelay=0&delay=50").routeId("route2") - .noAutoStartup() + .autoStartup(false) .to("log:stream") .split(body()).streaming() .to("log:row") @@ -113,7 +113,7 @@ public class SqlConsumerOutputTypeStreamListTest extends CamelTestSupport { .end(); from("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel&initialDelay=0&delay=50") - .routeId("route3").noAutoStartup() + .routeId("route3").autoStartup(false) .to("log:stream") .split(body()).streaming() .to("log:row") diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java index 40c7f2f33bde..4f4570e1019a 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.sql.stored; +import java.io.IOException; + import org.apache.camel.component.sql.stored.template.TemplateParser; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; @@ -48,17 +50,17 @@ public class TemplateCacheTest extends CamelTestSupport { } @Test - public void shouldCacheTemplateFunctions() { + public void shouldCacheTemplateFunctions() throws IOException { JdbcTemplate jdbcTemplate = new JdbcTemplate(db); - CallableStatementWrapperFactory fac - = new CallableStatementWrapperFactory(jdbcTemplate, new TemplateParser(context.getClassResolver()), false); - - BatchCallableStatementCreatorFactory batchFactory1 = fac.getTemplateForBatch("FOO()"); - BatchCallableStatementCreatorFactory batchFactory2 = fac.getTemplateForBatch("FOO()"); - assertSame(batchFactory1, batchFactory2); + try (CallableStatementWrapperFactory fac + = new CallableStatementWrapperFactory(jdbcTemplate, new TemplateParser(context.getClassResolver()), false)) { + BatchCallableStatementCreatorFactory batchFactory1 = fac.getTemplateForBatch("FOO()"); + BatchCallableStatementCreatorFactory batchFactory2 = fac.getTemplateForBatch("FOO()"); + assertSame(batchFactory1, batchFactory2); - TemplateStoredProcedure templateStoredProcedure1 = fac.getTemplateStoredProcedure("FOO()"); - TemplateStoredProcedure templateStoredProcedure2 = fac.getTemplateStoredProcedure("FOO()"); - assertSame(templateStoredProcedure1, templateStoredProcedure2); + TemplateStoredProcedure templateStoredProcedure1 = fac.getTemplateStoredProcedure("FOO()"); + TemplateStoredProcedure templateStoredProcedure2 = fac.getTemplateStoredProcedure("FOO()"); + assertSame(templateStoredProcedure1, templateStoredProcedure2); + } } }
