This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a2a3bf71 [FLINK-38965][postgres] Fix LIKE wildcard matching issue for 
similar table names in PostgreSQL CDC connector (#4239)
3a2a3bf71 is described below

commit 3a2a3bf716dcb325610eebb32c32380b59b2a868
Author: Jia Fan <[email protected]>
AuthorDate: Sat Jan 31 15:58:54 2026 +0800

    [FLINK-38965][postgres] Fix LIKE wildcard matching issue for similar table 
names in PostgreSQL CDC connector (#4239)
---
 .../postgresql/connection/PostgresConnection.java  |  20 +-
 .../postgres/source/SimilarTableNamesITCase.java   | 340 +++++++++++++++++++++
 .../src/test/resources/ddl/similar_names.sql       |  84 +++++
 3 files changed, 443 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
index 687dcc9ae..610bd4360 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -56,7 +56,7 @@ import java.util.regex.Pattern;
  * {@link JdbcConnection} connection extension used for connecting to Postgres 
instances.
  *
  * @author Horia Chiorean
- *     <p>Copied from Debezium 1.9.8-Final with three additional methods:
+ *     <p>Copied from Debezium 1.9.8-Final with the following modifications:
  *     <ul>
  *       <li>Constructor PostgresConnection( Configuration config, 
PostgresValueConverterBuilder
  *           valueConverterBuilder, ConnectionFactory factory) to allow 
passing a custom
@@ -66,6 +66,13 @@ import java.util.regex.Pattern;
  *       <li>override isTableUniqueIndexIncluded: Copied DBZ-5398 from 
Debezium 2.0.0.Final to fix
  *           https://github.com/ververica/flink-cdc-connectors/issues/2710. 
Remove this comment
  *           after bumping debezium version to 2.0.0.Final.
+ *       <li>FLINK-38965: Modified doReadTableColumn to filter out columns 
from other tables that
+ *           might be returned due to PostgreSQL LIKE wildcard matching. The 
underscore '_' matches
+ *           any single character, and '%' matches any sequence of characters. 
For example, when
+ *           querying table 'user_sink', the LIKE pattern may also match 
'userbsink' (due to '_');
+ *           when querying table 'user%data' (where % is a literal character 
in the table name), the
+ *           LIKE pattern may also match 'user_test_data' (due to '%'). See 
also:
+ *           
https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L1327
  *     </ul>
  */
 public class PostgresConnection extends JdbcConnection {
@@ -696,6 +703,17 @@ public class PostgresConnection extends JdbcConnection {
     private Optional<ColumnEditor> doReadTableColumn(
             ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnFilter)
             throws SQLException {
+        // FLINK-38965: Filter out columns from other tables that might be 
returned due to
+        // PostgreSQL LIKE wildcard matching. The underscore '_' matches any 
single character,
+        // and '%' matches any sequence of characters. For example:
+        // - When querying 'user_sink', the pattern may also match 'userbsink' 
(due to '_')
+        // - When querying 'user%data' (where % is literal), it may match 
'user_test_data' (due to
+        // '%')
+        final String resultTableName = columnMetadata.getString(3);
+        if (!tableId.table().equals(resultTableName)) {
+            return Optional.empty();
+        }
+
         final String columnName = columnMetadata.getString(4);
         if (columnFilter == null
                 || columnFilter.matches(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/SimilarTableNamesITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/SimilarTableNamesITCase.java
new file mode 100644
index 000000000..f4c643731
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/SimilarTableNamesITCase.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
+import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * IT tests for FLINK-38965: Fix PostgreSQL CDC connector issue when table 
names contain underscore
+ * or percent characters that match other tables due to LIKE wildcard behavior.
+ *
+ * <p>PostgreSQL LIKE wildcards:
+ *
+ * <ul>
+ *   <li>'_' (underscore) matches any single character. E.g., 'user_sink' 
matches 'userbsink'
+ *   <li>'%' (percent) matches any sequence of characters. E.g., 'user%data' 
matches
+ *       'user_test_data'
+ * </ul>
+ *
+ * <p>When table names contain these special characters, JDBC metadata queries 
using LIKE may return
+ * columns from unintended tables.
+ */
+@Timeout(value = 300, unit = TimeUnit.SECONDS)
+class SimilarTableNamesITCase extends PostgresTestBase {
+
+    private static final int DEFAULT_PARALLELISM = 2;
+    private static final String DB_NAME_PREFIX = "postgres";
+    private static final String SCHEMA_NAME = "similar_names";
+
+    @RegisterExtension
+    public final ExternalResourceProxy<MiniClusterWithClientResource> 
miniClusterResource =
+            new ExternalResourceProxy<>(
+                    new MiniClusterWithClientResource(
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setNumberTaskManagers(1)
+                                    
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                                    
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                    .withHaLeadershipControl()
+                                    .build()));
+
+    private final UniqueDatabase similarNamesDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER,
+                    DB_NAME_PREFIX,
+                    SCHEMA_NAME,
+                    POSTGRES_CONTAINER.getUsername(),
+                    POSTGRES_CONTAINER.getPassword());
+
+    private String slotName;
+
+    @BeforeEach
+    public void before() {
+        similarNamesDatabase.createAndInitialize();
+        this.slotName = getSlotName();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        Thread.sleep(1000L);
+        similarNamesDatabase.removeSlot(slotName);
+    }
+
+    // ==================== Test Cases ====================
+
+    /**
+     * Test that when capturing CDC events for table 'ndi_pg_user_sink_1', we 
don't accidentally
+     * capture events from 'ndi_pg_userbsink_1' which would match due to 
PostgreSQL's LIKE wildcard
+     * behavior (underscore '_' matches any single character).
+     *
+     * <p>Before the fix (FLINK-38965), this test would fail with: 
"java.lang.IllegalStateException:
+     * Duplicate key Optional.empty"
+     */
+    @Test
+    void testReadTableWithSimilarNameUnderscore() throws Exception {
+        StreamTableEnvironment tEnv = createTableEnv();
+
+        // Only capture events from 'ndi_pg_user_sink_1' table
+        tEnv.executeSql(createSourceDDL("target_table", "ndi_pg_user_sink_1"));
+
+        TableResult result = tEnv.executeSql("SELECT * FROM target_table");
+        CloseableIterator<Row> iterator = result.collect();
+
+        try {
+            // Verify snapshot data (3 rows from ndi_pg_user_sink_1)
+            List<String> expectedSnapshotData =
+                    Arrays.asList(
+                            "+I[1, user_1, Shanghai]",
+                            "+I[2, user_2, Beijing]",
+                            "+I[3, user_3, Hangzhou]");
+            assertRowsEquals(collectRows(iterator, 3), expectedSnapshotData);
+
+            // Perform DML operations on both tables
+            executeDmlOperations();
+
+            // Verify streaming events - should only contain events from 
ndi_pg_user_sink_1
+            List<String> expectedStreamData =
+                    Arrays.asList(
+                            "+I[4, user_4, Wuhan]",
+                            "-U[1, user_1, Shanghai]",
+                            "+U[1, user_1, Suzhou]");
+            assertRowsEquals(collectRows(iterator, 3), expectedStreamData);
+        } finally {
+            closeResourcesAndWaitForJobTermination(iterator, result);
+        }
+    }
+
+    /**
+     * Test that when capturing CDC events for table 'user%data' (which 
contains a literal '%'
+     * character), we don't accidentally capture events from 'user_test_data' 
which would match due
+     * to PostgreSQL's LIKE wildcard behavior (percent '%' matches any 
sequence of characters).
+     *
+     * <p>When querying for table 'user%data', the LIKE pattern may also match 
'user_test_data'
+     * because '%' matches the '_test_' sequence.
+     */
+    @Test
+    void testReadTableWithSimilarNamePercent() throws Exception {
+        StreamTableEnvironment tEnv = createTableEnv();
+
+        // Only capture events from 'user%data' table (note: % is a literal 
character in table name)
+        tEnv.executeSql(createSourceDDL("target_table", "user%data"));
+
+        TableResult result = tEnv.executeSql("SELECT * FROM target_table");
+        CloseableIterator<Row> iterator = result.collect();
+
+        try {
+            // Verify snapshot data (3 rows from user%data only)
+            List<String> expectedSnapshotData =
+                    Arrays.asList(
+                            "+I[201, percent_1, Tianjin]",
+                            "+I[202, percent_2, Dalian]",
+                            "+I[203, percent_3, Qingdao]");
+            assertRowsEquals(collectRows(iterator, 3), expectedSnapshotData);
+
+            // Perform DML on user_test_data table - these should NOT be 
captured
+            executeDmlOperationsOnTestDataTable();
+
+            // Perform DML on target table - these SHOULD be captured
+            try (Connection conn =
+                            getJdbcConnection(
+                                    POSTGRES_CONTAINER, 
similarNamesDatabase.getDatabaseName());
+                    Statement stmt = conn.createStatement()) {
+                stmt.execute(
+                        "INSERT INTO similar_names.\"user%data\" VALUES (204, 
'percent_4', 'Xiamen')");
+            }
+
+            // Should only see the insert from target table, not 
user_test_data table
+            List<String> expectedStreamData = Arrays.asList("+I[204, 
percent_4, Xiamen]");
+            assertRowsEquals(collectRows(iterator, 1), expectedStreamData);
+        } finally {
+            closeResourcesAndWaitForJobTermination(iterator, result);
+        }
+    }
+
+    /**
+     * Test reading from all similar-named tables to verify they can be 
captured independently
+     * without interference.
+     */
+    @Test
+    void testReadAllSimilarNamedTables() throws Exception {
+        StreamTableEnvironment tEnv = createTableEnv();
+
+        // Capture events from underscore-related tables using regex pattern
+        tEnv.executeSql(createSourceDDL("underscore_tables", "ndi_pg_user.*"));
+
+        TableResult result = tEnv.executeSql("SELECT * FROM 
underscore_tables");
+        CloseableIterator<Row> iterator = result.collect();
+
+        try {
+            // Verify snapshot data (3 rows from each table = 6 total)
+            List<String> expectedSnapshotData =
+                    Arrays.asList(
+                            // From ndi_pg_user_sink_1
+                            "+I[1, user_1, Shanghai]",
+                            "+I[2, user_2, Beijing]",
+                            "+I[3, user_3, Hangzhou]",
+                            // From ndi_pg_userbsink_1
+                            "+I[101, userb_1, Guangzhou]",
+                            "+I[102, userb_2, Shenzhen]",
+                            "+I[103, userb_3, Chengdu]");
+            assertRowsEquals(collectRows(iterator, 6), expectedSnapshotData);
+        } finally {
+            closeResourcesAndWaitForJobTermination(iterator, result);
+        }
+    }
+
+    // ==================== Helper Methods ====================
+
+    /** Creates and configures the StreamTableEnvironment. */
+    private StreamTableEnvironment createTableEnv() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200L);
+        return StreamTableEnvironment.create(env);
+    }
+
+    /** Creates the source DDL for the given table name pattern. */
+    private String createSourceDDL(String flinkTableName, String 
pgTablePattern) {
+        return format(
+                "CREATE TABLE %s ("
+                        + " id INT NOT NULL,"
+                        + " name STRING,"
+                        + " address STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'postgres-cdc',"
+                        + " 'hostname' = '%s',"
+                        + " 'port' = '%s',"
+                        + " 'username' = '%s',"
+                        + " 'password' = '%s',"
+                        + " 'database-name' = '%s',"
+                        + " 'schema-name' = '%s',"
+                        + " 'table-name' = '%s',"
+                        + " 'scan.startup.mode' = 'initial',"
+                        + " 'scan.incremental.snapshot.enabled' = 'true',"
+                        + " 'scan.incremental.snapshot.chunk.size' = '100',"
+                        + " 'decoding.plugin.name' = 'pgoutput',"
+                        + " 'slot.name' = '%s',"
+                        + " 'scan.lsn-commit.checkpoints-num-delay' = '1'"
+                        + ")",
+                flinkTableName,
+                similarNamesDatabase.getHost(),
+                similarNamesDatabase.getDatabasePort(),
+                similarNamesDatabase.getUsername(),
+                similarNamesDatabase.getPassword(),
+                similarNamesDatabase.getDatabaseName(),
+                SCHEMA_NAME,
+                pgTablePattern,
+                slotName);
+    }
+
+    /** Collects specified number of rows from the iterator. */
+    private List<String> collectRows(CloseableIterator<Row> iterator, int 
expectedCount) {
+        List<String> rows = new ArrayList<>();
+        int count = 0;
+        while (count < expectedCount && iterator.hasNext()) {
+            rows.add(iterator.next().toString());
+            count++;
+        }
+        return rows;
+    }
+
+    /** Asserts that actual rows match expected rows (order-insensitive). */
+    private void assertRowsEquals(List<String> actual, List<String> expected) {
+        assertThat(actual.stream().sorted().collect(Collectors.toList()))
+                .containsExactlyInAnyOrderElementsOf(
+                        
expected.stream().sorted().collect(Collectors.toList()));
+    }
+
+    /** Executes DML operations on target and similar-named tables for 
streaming phase testing. */
+    private void executeDmlOperations() throws Exception {
+        try (Connection conn =
+                        getJdbcConnection(
+                                POSTGRES_CONTAINER, 
similarNamesDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            // Insert into target table
+            stmt.execute(
+                    "INSERT INTO similar_names.ndi_pg_user_sink_1 VALUES (4, 
'user_4', 'Wuhan')");
+            // Insert into similar-named table (should NOT be captured when 
only listening to
+            // target)
+            stmt.execute(
+                    "INSERT INTO similar_names.ndi_pg_userbsink_1 VALUES (104, 
'userb_4', 'Nanjing')");
+            // Update target table
+            stmt.execute(
+                    "UPDATE similar_names.ndi_pg_user_sink_1 SET address = 
'Suzhou' WHERE id = 1");
+        }
+    }
+
+    /** Executes DML operations on user_test_data table for '%' wildcard 
testing. */
+    private void executeDmlOperationsOnTestDataTable() throws Exception {
+        try (Connection conn =
+                        getJdbcConnection(
+                                POSTGRES_CONTAINER, 
similarNamesDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            // Insert into user_test_data table (should NOT be captured when 
only listening to
+            // user%data)
+            stmt.execute(
+                    "INSERT INTO similar_names.user_test_data VALUES (304, 
'test_4', 'Harbin')");
+            // Update user_test_data table
+            stmt.execute(
+                    "UPDATE similar_names.user_test_data SET address = 
'Changchun' WHERE id = 301");
+        }
+    }
+
+    /** Properly closes resources and waits for job termination. */
+    private void closeResourcesAndWaitForJobTermination(
+            CloseableIterator<Row> iterator, TableResult result) throws 
Exception {
+        iterator.close();
+        result.getJobClient()
+                .ifPresent(
+                        client -> {
+                            client.cancel();
+                            try {
+                                client.getJobExecutionResult().get(30, 
TimeUnit.SECONDS);
+                            } catch (Exception e) {
+                                // Job cancelled, expected
+                            }
+                        });
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/similar_names.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/similar_names.sql
new file mode 100644
index 000000000..66ae3fec0
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/similar_names.sql
@@ -0,0 +1,84 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- FLINK-38965: Test case for similar table names with underscore or percent 
characters
+-- This tests the fix for PostgreSQL LIKE wildcard matching issue:
+-- - underscore '_' matches any single character
+-- - percent '%' matches any sequence of characters
+-- For example, 'user_sink' may match 'userbsink' (due to '_')
+-- and 'user%sink' may match 'user_test_sink' (due to '%')
+
+DROP SCHEMA IF EXISTS similar_names CASCADE;
+CREATE SCHEMA similar_names;
+SET search_path TO similar_names;
+
+-- Table 1: ndi_pg_user_sink_1 (target table)
+CREATE TABLE ndi_pg_user_sink_1 (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL,
+  address VARCHAR(1024)
+);
+ALTER TABLE ndi_pg_user_sink_1 REPLICA IDENTITY FULL;
+
+INSERT INTO ndi_pg_user_sink_1
+VALUES (1, 'user_1', 'Shanghai'),
+       (2, 'user_2', 'Beijing'),
+       (3, 'user_3', 'Hangzhou');
+
+-- Table 2: ndi_pg_userbsink_1 (similar name - only difference is 'b' instead 
of '_')
+-- This table name would match the LIKE pattern for 'ndi_pg_user_sink_1' 
+-- because '_' acts as a wildcard
+CREATE TABLE ndi_pg_userbsink_1 (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL,
+  address VARCHAR(1024)
+);
+ALTER TABLE ndi_pg_userbsink_1 REPLICA IDENTITY FULL;
+
+INSERT INTO ndi_pg_userbsink_1
+VALUES (101, 'userb_1', 'Guangzhou'),
+       (102, 'userb_2', 'Shenzhen'),
+       (103, 'userb_3', 'Chengdu');
+
+-- Table 3: user%data (tests '%' wildcard scenario)
+-- The table name contains '%' character which acts as a wildcard in LIKE 
pattern.
+-- When querying for table 'user%data', the LIKE pattern may also match
+-- 'user_test_data' because '%' matches any sequence of characters.
+CREATE TABLE "user%data" (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL,
+  address VARCHAR(1024)
+);
+ALTER TABLE "user%data" REPLICA IDENTITY FULL;
+
+INSERT INTO "user%data"
+VALUES (201, 'percent_1', 'Tianjin'),
+       (202, 'percent_2', 'Dalian'),
+       (203, 'percent_3', 'Qingdao');
+
+-- Table 4: user_test_data (similar to 'user%data' when % is treated as 
wildcard)
+-- This table name would match the LIKE pattern for 'user%data'
+-- because '%' matches '_test_' sequence.
+CREATE TABLE user_test_data (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL,
+  address VARCHAR(1024)
+);
+ALTER TABLE user_test_data REPLICA IDENTITY FULL;
+
+INSERT INTO user_test_data
+VALUES (301, 'test_1', 'Harbin'),
+       (302, 'test_2', 'Changchun'),
+       (303, 'test_3', 'Shenyang');

Reply via email to