This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c5974448 [FLINK-38512][postgres] Update database name validation to 
allow hyphens (#4258)
1c5974448 is described below

commit 1c597444886ad3dde268a8adc1f5a0871c768360
Author: Jia Fan <[email protected]>
AuthorDate: Mon Feb 9 21:06:47 2026 +0800

    [FLINK-38512][postgres] Update database name validation to allow hyphens 
(#4258)
---
 .../factory/PostgresDataSourceFactory.java         |   8 +-
 .../source/PostgresPipelineITCaseTest.java         | 145 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 3 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index f8d249c4d..578b6e0f8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -404,12 +404,14 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
     private boolean isValidPostgresDbName(String dbName) {
         // PostgreSQL database name conventions:
         // 1. Length does not exceed 63 characters
-        // 2. Can contain letters, numbers, underscores, and dollar signs
-        // 3. Cannot start with a dollar sign
+        // 2. Can contain letters, numbers, underscores, dollar signs, and 
hyphens
+        // 3. Must start with a letter, underscore, or dollar sign (cannot 
start with a hyphen)
+        // Note: While SQL identifiers have strict rules, database names 
created
+        // via createdb command can contain hyphens (e.g., createdb foo-bar)
         if (dbName == null || dbName.length() > 63) {
             return false;
         }
-        if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*")) {
+        if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$-]*")) {
             return false;
         }
         return true;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index b2aef9eed..40b87f5d4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.postgres.source;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -68,6 +69,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -583,6 +585,149 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
                 .isEqualTo(String.format("Replication slot \"%s\" does not 
exist", slotName));
     }
 
+    @Test
+    public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
+        // Create a real database with hyphen to verify full CDC sync works
+        // This test verifies the fix for FLINK-38512
+        String hyphenDbName = "test-db-with-hyphen";
+
+        // Create the database with hyphen (need to quote the name)
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // Drop if exists and create new database with hyphen in name
+            statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + 
"\"");
+            statement.execute("CREATE DATABASE \"" + hyphenDbName + "\"");
+        }
+
+        // Connect to the new database and create a table with data
+        String jdbcUrl =
+                String.format(
+                        "jdbc:postgresql://%s:%d/%s",
+                        POSTGRES_CONTAINER.getHost(),
+                        POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        hyphenDbName);
+        try (Connection connection =
+                        java.sql.DriverManager.getConnection(jdbcUrl, 
TEST_USER, TEST_PASSWORD);
+                Statement statement = connection.createStatement()) {
+            statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY, 
name VARCHAR(100))");
+            statement.execute(
+                    "INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'), 
(3, 'test3')");
+        }
+
+        // Create PostgresDataSource using PostgresSourceConfigFactory
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                .databaseList(hyphenDbName)
+                                .tableList("public.test_table")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC");
+        configFactory.database(hyphenDbName);
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        PostgresDataSource dataSource = new PostgresDataSource(configFactory);
+
+        // Verify the configuration works
+        assertThat(dataSource.getPostgresSourceConfig().getTableList())
+                .isEqualTo(Arrays.asList("public.test_table"));
+        
assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName);
+
+        // Now actually read data using the Flink streaming API
+        StreamExecutionEnvironment testEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        testEnv.setParallelism(1);
+        testEnv.enableCheckpointing(1000);
+        testEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) dataSource.getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                testEnv.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        // Collect events and verify data
+        List<Event> collectedEvents = new ArrayList<>();
+        int expectedDataCount = 3; // We inserted 3 rows
+        int dataCount = 0;
+        int maxEvents = 10; // Safety limit
+
+        while (events.hasNext() && collectedEvents.size() < maxEvents) {
+            Event event = events.next();
+            collectedEvents.add(event);
+            if (event instanceof DataChangeEvent) {
+                dataCount++;
+                if (dataCount >= expectedDataCount) {
+                    break;
+                }
+            }
+        }
+        events.close();
+
+        // Verify we received CreateTableEvent and DataChangeEvents
+        assertThat(collectedEvents).isNotEmpty();
+
+        // Check for CreateTableEvent
+        long createTableEventCount =
+                collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
+        assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+
+        // Check for DataChangeEvents (INSERT events from snapshot)
+        List<DataChangeEvent> dataChangeEvents =
+                collectedEvents.stream()
+                        .filter(e -> e instanceof DataChangeEvent)
+                        .map(e -> (DataChangeEvent) e)
+                        .collect(Collectors.toList());
+
+        assertThat(dataChangeEvents).hasSize(expectedDataCount);
+
+        // Verify the table ID in events
+        for (DataChangeEvent dce : dataChangeEvents) {
+            assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+            assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+        }
+
+        // Verify the data content - we should have 3 INSERT events with ids 
1, 2, 3
+        List<Integer> actualIds =
+                dataChangeEvents.stream()
+                        .map(
+                                dce -> {
+                                    RecordData after = dce.after();
+                                    return after.getInt(0); // id column
+                                })
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertThat(actualIds).containsExactly(1, 2, 3);
+
+        // Cleanup - first drop replication slot, then terminate connections 
and drop database
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // Drop replication slot first (it was created during CDC 
connection)
+            try {
+                statement.execute(String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
+            } catch (SQLException e) {
+                // Ignore if slot doesn't exist
+                LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+            }
+            // Terminate all connections to the database
+            statement.execute(
+                    "SELECT pg_terminate_backend(pid) FROM pg_stat_activity 
WHERE datname = '"
+                            + hyphenDbName
+                            + "'");
+            // Small delay to ensure connections are terminated
+            Thread.sleep(500);
+            statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + 
"\"");
+        }
+    }
+
     private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, 
T sideEvent) {
         List<T> result = new ArrayList<>(size);
         List<T> sideResults = new ArrayList<>();

Reply via email to