This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 6788ff5bf [FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075) 6788ff5bf is described below commit 6788ff5bf33585905714e00d1e5a4600ee509709 Author: Lanny Boarts <top...@live.cn> AuthorDate: Wed Aug 13 13:02:15 2025 +0800 [FLINK-38188][pipeline-connector][postgres] Fix database name validation logic in PostgresDataSourceFactory (#4075) --------- Co-authored-by: lvyanquan <lvyanquan....@alibaba-inc.com> --- .../factory/PostgresDataSourceFactory.java | 26 +++++------- .../factory/PostgresDataSourceFactoryTest.java | 48 ++++++++++++++++++++++ .../cdc/pipeline/tests/PostgresE2eITCase.java | 3 +- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 138c5db01..61c9766c7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -369,23 +369,19 @@ public class PostgresDataSourceFactory implements DataSourceFactory { String.format( "Tables format must db.schema.table, can not 'tables' = %s", TABLES.key())); - if (tableNameParts.length == 3) { - String currentDbName = tableNameParts[0]; + String currentDbName = tableNameParts[0]; + checkState( + isValidPostgresDbName(currentDbName), + String.format("%s is not a valid PostgreSQL database name", currentDbName)); + if (dbName == null) { + dbName = currentDbName; + } else { checkState( - isValidPostgresDbName(currentDbName), - String.format( - "The value of option %s does not conform to PostgresSQL database name naming conventions", - TABLES.key())); - if (dbName == null) { - dbName = currentDbName; - } else { - checkState( - !dbName.equals(currentDbName), - String.format( - "The value of option %s all table names must have the same database name", - TABLES.key())); - } + dbName.equals(currentDbName), + "The value of option `%s` is `%s`, but not all table names have the same database name", + TABLES.key(), + String.join(",", tableNames)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index 566667612..473c2d61c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -257,6 +257,54 @@ public class PostgresDataSourceFactoryTest extends PostgresTestBase { .isEqualTo(Arrays.asList("inventory.products")); } + @Test + public void testTableValidationWithDifferentDatabases() { + Map<String, String> options = new HashMap<>(); + options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + options.put( + PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put( + TABLES.key(), + "aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend"); + options.put(SLOT_NAME.key(), slotName); + + PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The value of option `tables` is `aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend`, but not all table names have the same database name"); + } + + @Test + public void testTableValidationWithOriginalBugScenario() { + Map<String, String> options = new HashMap<>(); + options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + options.put( + PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + String tables = + POSTGRES_CONTAINER.getDatabaseName() + + ".public.aia_t_icc_jjdb," + + POSTGRES_CONTAINER.getDatabaseName() + + ".public.aia_t_icc_jjdb_\\\\d{6}," + + POSTGRES_CONTAINER.getDatabaseName() + + ".public.aia_t_icc_jjdb_extend"; + options.put(TABLES.key(), tables); + options.put(SLOT_NAME.key(), slotName); + + PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot find any table by the option 'tables'"); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java index 815c1db8b..e4b33b3ff 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java @@ -111,7 +111,7 @@ public class PostgresE2eITCase extends PipelineTestEnvironment { + " port: %d\n" + " username: %s\n" + " password: %s\n" - + " tables: %s.inventory.\\.*\n" + + " tables: %s.inventory.products,%s.inventory.customers\n" + " slot.name: %s\n" + " scan.startup.mode: initial\n" + " server-time-zone: UTC\n" @@ -127,6 +127,7 @@ public class PostgresE2eITCase extends PipelineTestEnvironment { POSTGRES_TEST_USER, POSTGRES_TEST_PASSWORD, postgresInventoryDatabase.getDatabaseName(), + postgresInventoryDatabase.getDatabaseName(), slotName, parallelism); Path postgresCdcJar = TestUtils.getResource("postgres-cdc-pipeline-connector.jar");