This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5974448 [FLINK-38512][postgres] Update database name validation to
allow hyphens (#4258)
1c5974448 is described below
commit 1c597444886ad3dde268a8adc1f5a0871c768360
Author: Jia Fan <[email protected]>
AuthorDate: Mon Feb 9 21:06:47 2026 +0800
[FLINK-38512][postgres] Update database name validation to allow hyphens
(#4258)
---
.../factory/PostgresDataSourceFactory.java | 8 +-
.../source/PostgresPipelineITCaseTest.java | 145 +++++++++++++++++++++
2 files changed, 150 insertions(+), 3 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index f8d249c4d..578b6e0f8 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -404,12 +404,14 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
private boolean isValidPostgresDbName(String dbName) {
// PostgreSQL database name conventions:
// 1. Length does not exceed 63 characters
- // 2. Can contain letters, numbers, underscores, and dollar signs
- // 3. Cannot start with a dollar sign
+ // 2. Can contain letters, numbers, underscores, dollar signs, and
hyphens
+ // 3. Must start with a letter, underscore, or dollar sign (cannot
start with a hyphen)
+ // Note: While SQL identifiers have strict rules, database names
created
+ // via createdb command can contain hyphens (e.g., createdb foo-bar)
if (dbName == null || dbName.length() > 63) {
return false;
}
- if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*")) {
+ if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$-]*")) {
return false;
}
return true;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index b2aef9eed..40b87f5d4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -68,6 +69,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -583,6 +585,149 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
.isEqualTo(String.format("Replication slot \"%s\" does not
exist", slotName));
}
+ @Test
+ public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
+ // Create a real database with hyphen to verify full CDC sync works
+ // This test verifies the fix for FLINK-38512
+ String hyphenDbName = "test-db-with-hyphen";
+
+ // Create the database with hyphen (need to quote the name)
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ // Drop if exists and create new database with hyphen in name
+ statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName +
"\"");
+ statement.execute("CREATE DATABASE \"" + hyphenDbName + "\"");
+ }
+
+ // Connect to the new database and create a table with data
+ String jdbcUrl =
+ String.format(
+ "jdbc:postgresql://%s:%d/%s",
+ POSTGRES_CONTAINER.getHost(),
+ POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+ hyphenDbName);
+ try (Connection connection =
+ java.sql.DriverManager.getConnection(jdbcUrl,
TEST_USER, TEST_PASSWORD);
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY,
name VARCHAR(100))");
+ statement.execute(
+ "INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'),
(3, 'test3')");
+ }
+
+ // Create PostgresDataSource using PostgresSourceConfigFactory
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(hyphenDbName)
+ .tableList("public.test_table")
+ .startupOptions(StartupOptions.initial())
+ .serverTimeZone("UTC");
+ configFactory.database(hyphenDbName);
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ PostgresDataSource dataSource = new PostgresDataSource(configFactory);
+
+ // Verify the configuration works
+ assertThat(dataSource.getPostgresSourceConfig().getTableList())
+ .isEqualTo(Arrays.asList("public.test_table"));
+
assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName);
+
+ // Now actually read data using the Flink streaming API
+ StreamExecutionEnvironment testEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ testEnv.setParallelism(1);
+ testEnv.enableCheckpointing(1000);
+ testEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) dataSource.getEventSourceProvider();
+
+ CloseableIterator<Event> events =
+ testEnv.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ // Collect events and verify data
+ List<Event> collectedEvents = new ArrayList<>();
+ int expectedDataCount = 3; // We inserted 3 rows
+ int dataCount = 0;
+ int maxEvents = 10; // Safety limit
+
+ while (events.hasNext() && collectedEvents.size() < maxEvents) {
+ Event event = events.next();
+ collectedEvents.add(event);
+ if (event instanceof DataChangeEvent) {
+ dataCount++;
+ if (dataCount >= expectedDataCount) {
+ break;
+ }
+ }
+ }
+ events.close();
+
+ // Verify we received CreateTableEvent and DataChangeEvents
+ assertThat(collectedEvents).isNotEmpty();
+
+ // Check for CreateTableEvent
+ long createTableEventCount =
+ collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
+ assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+
+ // Check for DataChangeEvents (INSERT events from snapshot)
+ List<DataChangeEvent> dataChangeEvents =
+ collectedEvents.stream()
+ .filter(e -> e instanceof DataChangeEvent)
+ .map(e -> (DataChangeEvent) e)
+ .collect(Collectors.toList());
+
+ assertThat(dataChangeEvents).hasSize(expectedDataCount);
+
+ // Verify the table ID in events
+ for (DataChangeEvent dce : dataChangeEvents) {
+ assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+ assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+ }
+
+ // Verify the data content - we should have 3 INSERT events with ids
1, 2, 3
+ List<Integer> actualIds =
+ dataChangeEvents.stream()
+ .map(
+ dce -> {
+ RecordData after = dce.after();
+ return after.getInt(0); // id column
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualIds).containsExactly(1, 2, 3);
+
+ // Cleanup - first drop replication slot, then terminate connections
and drop database
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ // Drop replication slot first (it was created during CDC
connection)
+ try {
+ statement.execute(String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
+ } catch (SQLException e) {
+ // Ignore if slot doesn't exist
+ LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ }
+ // Terminate all connections to the database
+ statement.execute(
+ "SELECT pg_terminate_backend(pid) FROM pg_stat_activity
WHERE datname = '"
+ + hyphenDbName
+ + "'");
+ // Small delay to ensure connections are terminated
+ Thread.sleep(500);
+ statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName +
"\"");
+ }
+ }
+
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size,
T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();