This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6788ff5bf [FLINK-38188][pipeline-connector][postgres]  Fix database 
name validation logic in PostgresDataSourceFactory (#4075)
6788ff5bf is described below

commit 6788ff5bf33585905714e00d1e5a4600ee509709
Author: Lanny Boarts <top...@live.cn>
AuthorDate: Wed Aug 13 13:02:15 2025 +0800

    [FLINK-38188][pipeline-connector][postgres]  Fix database name validation 
logic in PostgresDataSourceFactory (#4075)
    
    ---------
    
    Co-authored-by: lvyanquan <lvyanquan....@alibaba-inc.com>
---
 .../factory/PostgresDataSourceFactory.java         | 26 +++++-------
 .../factory/PostgresDataSourceFactoryTest.java     | 48 ++++++++++++++++++++++
 .../cdc/pipeline/tests/PostgresE2eITCase.java      |  3 +-
 3 files changed, 61 insertions(+), 16 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 138c5db01..61c9766c7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -369,23 +369,19 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
                     String.format(
                             "Tables format must db.schema.table, can not 
'tables' = %s",
                             TABLES.key()));
-            if (tableNameParts.length == 3) {
-                String currentDbName = tableNameParts[0];
+            String currentDbName = tableNameParts[0];
 
+            checkState(
+                    isValidPostgresDbName(currentDbName),
+                    String.format("%s is not a valid PostgreSQL database 
name", currentDbName));
+            if (dbName == null) {
+                dbName = currentDbName;
+            } else {
                 checkState(
-                        isValidPostgresDbName(currentDbName),
-                        String.format(
-                                "The value of option %s does not conform to 
PostgresSQL database name naming conventions",
-                                TABLES.key()));
-                if (dbName == null) {
-                    dbName = currentDbName;
-                } else {
-                    checkState(
-                            !dbName.equals(currentDbName),
-                            String.format(
-                                    "The value of option %s all table names 
must have the same database name",
-                                    TABLES.key()));
-                }
+                        dbName.equals(currentDbName),
+                        "The value of option `%s` is `%s`, but not all table 
names have the same database name",
+                        TABLES.key(),
+                        String.join(",", tableNames));
             }
         }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
index 566667612..473c2d61c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
@@ -257,6 +257,54 @@ public class PostgresDataSourceFactoryTest extends 
PostgresTestBase {
                 .isEqualTo(Arrays.asList("inventory.products"));
     }
 
+    @Test
+    public void testTableValidationWithDifferentDatabases() {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
+        options.put(
+                PG_PORT.key(), 
String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(
+                TABLES.key(),
+                
"aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend");
+        options.put(SLOT_NAME.key(), slotName);
+
+        PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        assertThatThrownBy(() -> factory.createDataSource(context))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "The value of option `tables` is 
`aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend`, but 
not all table names have the same database name");
+    }
+
+    @Test
+    public void testTableValidationWithOriginalBugScenario() {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
+        options.put(
+                PG_PORT.key(), 
String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        String tables =
+                POSTGRES_CONTAINER.getDatabaseName()
+                        + ".public.aia_t_icc_jjdb,"
+                        + POSTGRES_CONTAINER.getDatabaseName()
+                        + ".public.aia_t_icc_jjdb_\\\\d{6},"
+                        + POSTGRES_CONTAINER.getDatabaseName()
+                        + ".public.aia_t_icc_jjdb_extend";
+        options.put(TABLES.key(), tables);
+        options.put(SLOT_NAME.key(), slotName);
+
+        PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        assertThatThrownBy(() -> factory.createDataSource(context))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Cannot find any table by the option 
'tables'");
+    }
+
     class MockContext implements Factory.Context {
 
         Configuration factoryConfiguration;
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
index 815c1db8b..e4b33b3ff 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
@@ -111,7 +111,7 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
                                 + "  port: %d\n"
                                 + "  username: %s\n"
                                 + "  password: %s\n"
-                                + "  tables: %s.inventory.\\.*\n"
+                                + "  tables: 
%s.inventory.products,%s.inventory.customers\n"
                                 + "  slot.name: %s\n"
                                 + "  scan.startup.mode: initial\n"
                                 + "  server-time-zone: UTC\n"
@@ -127,6 +127,7 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
                         POSTGRES_TEST_USER,
                         POSTGRES_TEST_PASSWORD,
                         postgresInventoryDatabase.getDatabaseName(),
+                        postgresInventoryDatabase.getDatabaseName(),
                         slotName,
                         parallelism);
         Path postgresCdcJar = 
TestUtils.getResource("postgres-cdc-pipeline-connector.jar");

Reply via email to