This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9f2f3b1c33cbd71680d9f89544540751f5fdb74e
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Thu Dec 4 10:27:50 2025 +0100

    feat(components): enhance PG queries
    
    Note, NOTIFY vs pg_notify is documented in 
https://www.postgresql.org/docs/current/sql-notify.html
---
 .../pg/replication/slot/PgReplicationSlotConsumer.java   | 14 +++++++++-----
 .../apache/camel/component/pgevent/PgEventConsumer.java  | 16 ++++++++++++----
 .../apache/camel/component/pgevent/PgEventProducer.java  |  7 ++++---
 .../camel/pgevent/integration/PgEventITSupport.java      |  4 ----
 4 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 0c1882670a9d..b61702199f4c 100644
--- 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.pg.replication.slot;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -197,12 +198,15 @@ public class PgReplicationSlotConsumer extends 
ScheduledPollConsumer {
     }
 
     private boolean isSlotCreated() throws SQLException {
-        String sql
-                = String.format("SELECT count(*) FROM pg_replication_slots 
WHERE slot_name = '%s';", this.endpoint.getSlot());
+        String sql = "SELECT count(*) FROM pg_replication_slots WHERE 
slot_name = ?";
 
-        try (Statement statement = this.connection.createStatement(); 
ResultSet resultSet = statement.executeQuery(sql)) {
-            resultSet.next();
-            return resultSet.getInt(1) > 0;
+        try (PreparedStatement ps = this.connection.prepareStatement(sql)) {
+            ps.setString(1, this.endpoint.getSlot());
+
+            try (ResultSet rs = ps.executeQuery()) {
+                rs.next();
+                return rs.getInt(1) > 0;
+            }
         }
     }
 
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index 83c7f5856303..c9fcd00545a6 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -124,18 +124,26 @@ public class PgEventConsumer extends DefaultConsumer {
 
         public void initConnection() throws Exception {
             dbConnection = endpoint.initJdbc();
-            String sql = String.format("LISTEN %s", endpoint.getChannel());
+            String channel = endpoint.getChannel();
+            if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
+                throw new IllegalArgumentException("Invalid channel name");
+            }
+            String sql = String.format("LISTEN %s", channel);
             try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
                 statement.execute();
             }
-            dbConnection.addNotificationListener(endpoint.getChannel(), 
endpoint.getChannel(), listener);
+            dbConnection.addNotificationListener(channel, channel, listener);
         }
 
         public void closeConnection() throws Exception {
             if (dbConnection != null) {
                 try {
-                    
dbConnection.removeNotificationListener(endpoint.getChannel());
-                    String sql = String.format("UNLISTEN %s", 
endpoint.getChannel());
+                    String channel = endpoint.getChannel();
+                    if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
+                        throw new IllegalArgumentException("Invalid channel 
name");
+                    }
+                    dbConnection.removeNotificationListener(channel);
+                    String sql = String.format("UNLISTEN %s", channel);
                     try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
                         statement.execute();
                     }
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
index d0a6f9d170e9..317ea6700162 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
@@ -53,9 +53,10 @@ public class PgEventProducer extends DefaultProducer {
                 statement.execute();
             }
         } else {
-            String sql = String.format("NOTIFY %s, '%s'", 
endpoint.getChannel(), payload);
-            try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
-                statement.execute();
+            try (PreparedStatement stmt = 
dbConnection.prepareStatement("SELECT pg_notify(?, ?)")) {
+                stmt.setString(1, endpoint.getChannel());
+                stmt.setString(2, payload);
+                stmt.execute();
             }
         }
     }
diff --git 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java
 
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java
index ca0633fc258b..661c432835d3 100644
--- 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java
+++ 
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java
@@ -21,8 +21,6 @@ import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.postgresql.PostgreSQLContainer;
 
 public class PgEventITSupport extends CamelTestSupport {
@@ -38,8 +36,6 @@ public class PgEventITSupport extends CamelTestSupport {
     protected static final String POSTGRES_PASSWORD = "mysecretpassword";
     protected static final String POSTGRES_DB = "postgres";
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PgEventITSupport.class.getName());
-
     private static PostgreSQLContainer container;
 
     private static JDBCService createLocalService() {

Reply via email to