This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1d2568f00236dcd45b4e022912646155a0f97ff5
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Thu Dec 4 11:43:03 2025 +0100

    feat(components): sql query enhancement
---
 .../stored/CallableStatementWrapperFactory.java    |  2 --
 .../jdbc/ClusteredJdbcAggregationRepository.java   | 23 ++++++++++++----
 .../aggregate/jdbc/JdbcAggregationRepository.java  | 31 +++++++++++++++-------
 .../JdbcOrphanLockAwareIdempotentRepository.java   |  6 ++---
 .../component/sql/ProducerBatchSimpleExpTest.java  |  2 +-
 .../sql/SqlConsumerDynamicParameterTest.java       |  2 +-
 .../sql/SqlConsumerOutputTypeStreamListTest.java   |  6 ++---
 .../component/sql/stored/TemplateCacheTest.java    | 22 ++++++++-------
 8 files changed, 60 insertions(+), 34 deletions(-)

diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java
index 2f67cd4b3c77..e555d19c87d8 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/stored/CallableStatementWrapperFactory.java
@@ -36,9 +36,7 @@ public class CallableStatementWrapperFactory extends 
ServiceSupport {
     final TemplateParser templateParser;
     boolean function;
 
-    @SuppressWarnings("unchecked")
     private final Map<String, TemplateStoredProcedure> templateCache = 
LRUCacheFactory.newLRUCache(TEMPLATE_CACHE_DEFAULT_SIZE);
-    @SuppressWarnings("unchecked")
     private final Map<String, BatchCallableStatementCreatorFactory> 
batchTemplateCache
             = LRUCacheFactory.newLRUCache(BATCH_TEMPLATE_CACHE_DEFAULT_SIZE);
 
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
index 5f0150e9af2b..dc2450cf37a5 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
@@ -29,6 +29,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
 import org.springframework.jdbc.core.RowMapper;
 import 
org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
 import org.springframework.jdbc.support.lob.LobCreator;
@@ -74,8 +75,9 @@ public class ClusteredJdbcAggregationRepository extends 
JdbcAggregationRepositor
                 final long version = exchange.getProperty(VERSION_PROPERTY, 
Long.class);
                 try {
                     LOG.debug("Removing key {}", correlationId);
-
-                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ? AND " + VERSION + " = ?",
+                    String table = getRepositoryName();
+                    verifyTableName(table);
+                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?",
                             correlationId, version);
 
                     insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version, true);
@@ -173,10 +175,21 @@ public class ClusteredJdbcAggregationRepository extends 
JdbcAggregationRepositor
     public Set<String> scan(final CamelContext camelContext) {
         return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
             public LinkedHashSet<String> doInTransaction(final 
TransactionStatus status) {
+                String table = getRepositoryNameCompleted();
+                verifyTableName(table);
+
+                String sql;
+                final String[] params;
+                if (isRecoveryByInstance()) {
+                    sql = "SELECT " + ID + " FROM " + table + " WHERE 
INSTANCE_ID = ?";
+                    params = new String[] { instanceId };
+                } else {
+                    sql = "SELECT " + ID + " FROM " + table;
+                    params = new String[] {};
+                }
+                ArgumentPreparedStatementSetter apss = new 
ArgumentPreparedStatementSetter(params);
                 final List<String> keys = jdbcTemplate.query(
-                        "SELECT " + ID + " FROM " + 
getRepositoryNameCompleted()
-                                                             + 
(isRecoveryByInstance()
-                                                                     ? " WHERE 
INSTANCE_ID='" + instanceId + "'" : ""),
+                        sql, apss,
                         new RowMapper<String>() {
                             public String mapRow(final ResultSet rs, final int 
rowNum) throws SQLException {
                                 final String id = rs.getString(ID);
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index fdf511f4b349..6dc89bcc0c4f 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -192,14 +192,15 @@ public class JdbcAggregationRepository extends 
ServiceSupport
 
                 try {
                     LOG.debug("Adding exchange with key {}", correlationId);
-
+                    String table = getRepositoryName();
+                    verifyTableName(table);
                     boolean present = jdbcTemplate.queryForObject(
-                            "SELECT COUNT(1) FROM " + getRepositoryName() + " 
WHERE " + ID + " = ?", Integer.class,
+                            "SELECT COUNT(1) FROM " + table + " WHERE " + ID + 
" = ?", Integer.class,
                             correlationId) != 0;
 
                     // Recover existing exchange with that ID
                     if (isReturnOldExchange() && present) {
-                        result = get(correlationId, getRepositoryName(), 
camelContext);
+                        result = get(correlationId, table, camelContext);
                     }
 
                     if (present) {
@@ -210,11 +211,11 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                         } else {
                             long version = versionLong.longValue();
                             LOG.debug("Updating record with key {} and version 
{}", correlationId, version);
-                            update(camelContext, correlationId, exchange, 
getRepositoryName(), version);
+                            update(camelContext, correlationId, exchange, 
table, version);
                         }
                     } else {
                         LOG.debug("Inserting record with key {}", 
correlationId);
-                        insert(camelContext, correlationId, exchange, 
getRepositoryName(), 1L);
+                        insert(camelContext, correlationId, exchange, table, 
1L);
                     }
 
                 } catch (Exception e) {
@@ -227,6 +228,13 @@ public class JdbcAggregationRepository extends 
ServiceSupport
         });
     }
 
+    // Useful to verify if the table name does not contain invalid characters.
+    protected static void verifyTableName(String tableName) {
+        if (!tableName.matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
+            throw new IllegalArgumentException("Invalid repository name: " + 
tableName);
+        }
+    }
+
     /**
      * Updates the current exchange details in the given repository table.
      *
@@ -376,7 +384,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
         return transactionTemplateReadOnly.execute(new 
TransactionCallback<Exchange>() {
             public Exchange doInTransaction(TransactionStatus status) {
                 try {
-
+                    verifyTableName(repositoryName);
                     Map<String, Object> columns = jdbcTemplate.queryForMap(
                             String.format("SELECT %1$s, %2$s FROM %3$s WHERE 
%4$s=?", EXCHANGE, VERSION, repositoryName, ID),
                             new Object[] { key }, new int[] { Types.VARCHAR });
@@ -416,8 +424,9 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                 final long version = exchange.getProperty(VERSION_PROPERTY, 
Long.class);
                 try {
                     LOG.debug("Removing key {}", correlationId);
-
-                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ? AND " + VERSION + " = ?",
+                    String table = getRepositoryName();
+                    verifyTableName(table);
+                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?",
                             correlationId, version);
 
                     insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version);
@@ -440,8 +449,10 @@ public class JdbcAggregationRepository extends 
ServiceSupport
         return transactionTemplate.execute(new TransactionCallback<Boolean>() {
             public Boolean doInTransaction(TransactionStatus status) {
                 LOG.debug("Confirming exchangeId {}", exchangeId);
+                String table = getRepositoryNameCompleted();
+                verifyTableName(table);
                 final int mustBeOne = jdbcTemplate
-                        .update("DELETE FROM " + getRepositoryNameCompleted() 
+ " WHERE " + ID + " = ?", exchangeId);
+                        .update("DELETE FROM " + table + " WHERE " + ID + " = 
?", exchangeId);
                 if (mustBeOne != 1) {
                     LOG.error("problem removing row {} from {} - DELETE 
statement did not return 1 but {}",
                             exchangeId, getRepositoryNameCompleted(), 
mustBeOne);
@@ -471,6 +482,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
     protected Set<String> getKeys(final String repositoryName) {
         return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
             public LinkedHashSet<String> doInTransaction(TransactionStatus 
status) {
+                verifyTableName(repositoryName);
                 List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + repositoryName,
                         new RowMapper<String>() {
                             public String mapRow(ResultSet rs, int rowNum) 
throws SQLException {
@@ -698,6 +710,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
     }
 
     private int rowCount(final String repository) {
+        verifyTableName(repository);
         return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + 
repository, Integer.class);
     }
 
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
index 2934a71e153a..95992703ff4b 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
 import javax.sql.DataSource;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.ShutdownableService;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -45,7 +44,7 @@ import 
org.springframework.transaction.support.TransactionTemplate;
  * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock 
has crashed. This is determined if the
  * timestamp on the createdAt column is more than the lockMaxAge.
  */
-public class JdbcOrphanLockAwareIdempotentRepository extends 
JdbcMessageIdRepository implements ShutdownableService {
+public class JdbcOrphanLockAwareIdempotentRepository extends 
JdbcMessageIdRepository {
 
     private final StampedLock sl = new StampedLock();
 
@@ -94,9 +93,10 @@ public class JdbcOrphanLockAwareIdempotentRepository extends 
JdbcMessageIdReposi
          * If the update timestamp time is more than lockMaxAge then assume 
that the lock is orphan and the process
          * which had acquired the lock has died
          */
+        // NOTE: the querystring is passed by the user, so, we are safe as he 
decide what query wants to perform.
         String orphanLockRecoverQueryString = getQueryString() + " AND 
createdAt >= ?";
         Timestamp xMillisAgo = new Timestamp(System.currentTimeMillis() - 
lockMaxAgeMillis);
-        return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, 
Integer.class, processorName, key,
+        return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, 
Integer.class, processorName, key, // NOSONAR
                 xMillisAgo);
     }
 
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java
index 0735971fd3e8..b7a000797680 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProducerBatchSimpleExpTest.java
@@ -60,7 +60,7 @@ public class ProducerBatchSimpleExpTest extends 
CamelTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:query");
         mock.expectedMessageCount(1);
 
-        List data = new ArrayList();
+        List<MyData> data = new ArrayList<>();
         data.add(new MyData(4, "Donald", "DIS"));
         data.add(new MyData(5, "Goofy", "DIS"));
         template.requestBody("direct:query", data);
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
index 0582ff90b08f..e8bd040420c1 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
@@ -87,7 +87,7 @@ public class SqlConsumerDynamicParameterTest extends 
CamelTestSupport {
                 getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
 
                 from("sql:select * from projects where id = 
:#${bean:myIdGenerator.nextId}?initialDelay=0&delay=50")
-                        .routeId("foo").noAutoStartup()
+                        .routeId("foo").autoStartup("false")
                         .to("mock:result");
             }
         };
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java
index c5329089b049..c65fdd194726 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerOutputTypeStreamListTest.java
@@ -100,12 +100,12 @@ public class SqlConsumerOutputTypeStreamListTest extends 
CamelTestSupport {
                 getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
 
                 from("sql:select * from projects order by 
id?outputType=StreamList&initialDelay=0&delay=50").routeId("route1")
-                        .noAutoStartup()
+                        .autoStartup(false)
                         .to("log:stream")
                         .to("mock:result");
 
                 from("sql:select * from projects order by 
id?outputType=StreamList&initialDelay=0&delay=50").routeId("route2")
-                        .noAutoStartup()
+                        .autoStartup(false)
                         .to("log:stream")
                         .split(body()).streaming()
                         .to("log:row")
@@ -113,7 +113,7 @@ public class SqlConsumerOutputTypeStreamListTest extends 
CamelTestSupport {
                         .end();
 
                 from("sql:select * from projects order by 
id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel&initialDelay=0&delay=50")
-                        .routeId("route3").noAutoStartup()
+                        .routeId("route3").autoStartup(false)
                         .to("log:stream")
                         .split(body()).streaming()
                         .to("log:row")
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java
index 40c7f2f33bde..4f4570e1019a 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/stored/TemplateCacheTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.sql.stored;
 
+import java.io.IOException;
+
 import org.apache.camel.component.sql.stored.template.TemplateParser;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
@@ -48,17 +50,17 @@ public class TemplateCacheTest extends CamelTestSupport {
     }
 
     @Test
-    public void shouldCacheTemplateFunctions() {
+    public void shouldCacheTemplateFunctions() throws IOException {
         JdbcTemplate jdbcTemplate = new JdbcTemplate(db);
-        CallableStatementWrapperFactory fac
-                = new CallableStatementWrapperFactory(jdbcTemplate, new 
TemplateParser(context.getClassResolver()), false);
-
-        BatchCallableStatementCreatorFactory batchFactory1 = 
fac.getTemplateForBatch("FOO()");
-        BatchCallableStatementCreatorFactory batchFactory2 = 
fac.getTemplateForBatch("FOO()");
-        assertSame(batchFactory1, batchFactory2);
+        try (CallableStatementWrapperFactory fac
+                = new CallableStatementWrapperFactory(jdbcTemplate, new 
TemplateParser(context.getClassResolver()), false)) {
+            BatchCallableStatementCreatorFactory batchFactory1 = 
fac.getTemplateForBatch("FOO()");
+            BatchCallableStatementCreatorFactory batchFactory2 = 
fac.getTemplateForBatch("FOO()");
+            assertSame(batchFactory1, batchFactory2);
 
-        TemplateStoredProcedure templateStoredProcedure1 = 
fac.getTemplateStoredProcedure("FOO()");
-        TemplateStoredProcedure templateStoredProcedure2 = 
fac.getTemplateStoredProcedure("FOO()");
-        assertSame(templateStoredProcedure1, templateStoredProcedure2);
+            TemplateStoredProcedure templateStoredProcedure1 = 
fac.getTemplateStoredProcedure("FOO()");
+            TemplateStoredProcedure templateStoredProcedure2 = 
fac.getTemplateStoredProcedure("FOO()");
+            assertSame(templateStoredProcedure1, templateStoredProcedure2);
+        }
     }
 }

Reply via email to