This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9f2f3b1c33cbd71680d9f89544540751f5fdb74e Author: Pasquale Congiusti <[email protected]> AuthorDate: Thu Dec 4 10:27:50 2025 +0100 feat(components): enhance PG queries Note, NOTIFY vs pg_notify is documented in https://www.postgresql.org/docs/current/sql-notify.html --- .../pg/replication/slot/PgReplicationSlotConsumer.java | 14 +++++++++----- .../apache/camel/component/pgevent/PgEventConsumer.java | 16 ++++++++++++---- .../apache/camel/component/pgevent/PgEventProducer.java | 7 ++++--- .../camel/pgevent/integration/PgEventITSupport.java | 4 ---- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java index 0c1882670a9d..b61702199f4c 100644 --- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java +++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.pg.replication.slot; import java.net.SocketException; import java.nio.ByteBuffer; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -197,12 +198,15 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { } private boolean isSlotCreated() throws SQLException { - String sql - = String.format("SELECT count(*) FROM pg_replication_slots WHERE slot_name = '%s';", this.endpoint.getSlot()); + String sql = "SELECT count(*) FROM pg_replication_slots WHERE slot_name = ?"; - try (Statement statement = this.connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { - resultSet.next(); - return resultSet.getInt(1) > 0; + try (PreparedStatement ps = this.connection.prepareStatement(sql)) { + ps.setString(1, this.endpoint.getSlot()); + + try (ResultSet rs = ps.executeQuery()) { + rs.next(); + return rs.getInt(1) > 0; + } } } diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index 83c7f5856303..c9fcd00545a6 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -124,18 +124,26 @@ public class PgEventConsumer extends DefaultConsumer { public void initConnection() throws Exception { dbConnection = endpoint.initJdbc(); - String sql = String.format("LISTEN %s", endpoint.getChannel()); + String channel = endpoint.getChannel(); + if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) { + throw new IllegalArgumentException("Invalid channel name"); + } + String sql = String.format("LISTEN %s", channel); try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { statement.execute(); } - dbConnection.addNotificationListener(endpoint.getChannel(), endpoint.getChannel(), listener); + dbConnection.addNotificationListener(channel, channel, listener); } public void closeConnection() throws Exception { if (dbConnection != null) { try { - dbConnection.removeNotificationListener(endpoint.getChannel()); - String sql = String.format("UNLISTEN %s", endpoint.getChannel()); + String channel = endpoint.getChannel(); + if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) { + throw new IllegalArgumentException("Invalid channel name"); + } + dbConnection.removeNotificationListener(channel); + String sql = String.format("UNLISTEN %s", channel); try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { statement.execute(); } diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java index d0a6f9d170e9..317ea6700162 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java @@ -53,9 +53,10 @@ public class PgEventProducer extends DefaultProducer { statement.execute(); } } else { - String sql = String.format("NOTIFY %s, '%s'", endpoint.getChannel(), payload); - try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { - statement.execute(); + try (PreparedStatement stmt = dbConnection.prepareStatement("SELECT pg_notify(?, ?)")) { + stmt.setString(1, endpoint.getChannel()); + stmt.setString(2, payload); + stmt.execute(); } } } diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java index ca0633fc258b..661c432835d3 100644 --- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java +++ b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/integration/PgEventITSupport.java @@ -21,8 +21,6 @@ import org.apache.camel.test.infra.jdbc.services.JDBCService; import org.apache.camel.test.infra.jdbc.services.JDBCServiceFactory; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.postgresql.PostgreSQLContainer; public class PgEventITSupport extends CamelTestSupport { @@ -38,8 +36,6 @@ public class PgEventITSupport extends CamelTestSupport { protected static final String POSTGRES_PASSWORD = "mysecretpassword"; protected static final String POSTGRES_DB = "postgres"; - private static final Logger LOG = LoggerFactory.getLogger(PgEventITSupport.class.getName()); - private static PostgreSQLContainer container; private static JDBCService createLocalService() {
