This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ce88ef41c9d [test](cdc) Add case sensitivity ITCases for MySQL and PG
(#65033)
ce88ef41c9d is described below
commit ce88ef41c9dfdfdc8f0a60977ff2dba165565ab5
Author: wudi <[email protected]>
AuthorDate: Thu Jul 2 10:26:11 2026 +0800
[test](cdc) Add case sensitivity ITCases for MySQL and PG (#65033)
Add integration tests to cover table identifier case handling:
- **MySqlCaseSensitiveITCase**: MySQL 8.0 with default
lower_case_table_names=0, mixed-case table name T_Users
- **PostgresCaseFoldingITCase**: PG unquoted identifier folding + quoted
identifier preservation
- **ConfigUtilTest**: verify
getTableList/parseAllExcludeColumns/parseAllTargetTableMappings preserve
original case
---
.github/workflows/build-extension.yml | 2 +-
.../cdcclient/itcase/MySqlCaseSensitiveITCase.java | 158 +++++++++++++++++
.../itcase/PostgresCaseFoldingITCase.java | 192 +++++++++++++++++++++
.../doris/cdcclient/utils/ConfigUtilTest.java | 40 +++++
4 files changed, 391 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/build-extension.yml
b/.github/workflows/build-extension.yml
index 735ce2beeba..dd14e02cd83 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -147,7 +147,7 @@ jobs:
- name: Run cdc client integration tests
run: |
- cd fe && mvn -Pflatten install -pl fe-common -Dskip.doc=true
-DskipTests -Dmaven.build.cache.enabled=false
+ cd fe && mvn -Pflatten install -pl fe-common -am -Dskip.doc=true
-DskipTests -Dmaven.build.cache.enabled=false
cd ../fs_brokers/cdc_client && mvn -B test failsafe:integration-test
failsafe:verify
# build-docs:
# name: Build Documents
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
new file mode 100644
index 00000000000..58e65fd219d
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Verify that the cdc_client read path works with mixed-case table names when
+ * {@code lower_case_table_names=0} (the default on Linux).
+ *
+ * <p>MySQL 8.0 on Linux stores table names in their original case under this
mode. Debezium
+ * returns the stored name as-is, so the cdc_client cache layers
(exclude-columns, target-table
+ * mappings) must match the configured name exactly.
+ */
+@Testcontainers
+class MySqlCaseSensitiveITCase {
+
+ private static final String USER = "cdc";
+ private static final String PASSWORD = "123456";
+ private static final String ROOT_USER = "root";
+ private static final String ROOT_PASSWORD = "123456";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(500_000);
+
+ @Container
+ static final MySQLContainer<?> MYSQL =
+ new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withConfigurationOverride("docker/server-with-binlog")
+ .withDatabaseName("cdc_test")
+ .withUsername(USER)
+ .withPassword(PASSWORD)
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+ private String jobId;
+ private String database;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ database = "case_db_" + jobId;
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("CREATE DATABASE " + database);
+ st.execute("USE " + database);
+ st.execute(
+ "CREATE TABLE T_Users (id INT NOT NULL, UserName
VARCHAR(50), PRIMARY KEY (id))");
+ st.execute("INSERT INTO T_Users VALUES (1,'Alice'), (2,'Bob'),
(3,'Carol')");
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Env.getCurrentEnv().close(jobId);
+ try (Connection conn = rootConnection("");
+ Statement st = conn.createStatement()) {
+ st.execute("DROP DATABASE IF EXISTS " + database);
+ }
+ }
+
+ @Test
+ void readsSnapshotThenBinlogWithMixedCaseTableName() throws Exception {
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.mysql(
+ jobId,
+ MYSQL.getHost(),
+ MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+ ROOT_USER,
+ ROOT_PASSWORD,
+ database,
+ "T_Users",
+ "initial")) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("T_Users");
+ SnapshotResult snapshot = harness.readSnapshot(splits);
+
+ Map<Integer, String> names = namesById(snapshot.records());
+ assertThat(names).containsOnlyKeys(1, 2, 3);
+ assertThat(names.get(1)).isEqualTo("Alice");
+ assertThat(names.get(2)).isEqualTo("Bob");
+ assertThat(names.get(3)).isEqualTo("Carol");
+
+ insert("T_Users", 4, "Dave");
+
+ List<String> binlog =
+ harness.readBinlogUntil(snapshot, splits, 1,
Duration.ofSeconds(60));
+ assertThat(binlog).hasSize(1);
+ JsonNode row = MAPPER.readTree(binlog.get(0));
+ assertThat(row.get("id").asInt()).isEqualTo(4);
+ assertThat(row.get("UserName").asText()).isEqualTo("Dave");
+ assertThat(row.get("__DORIS_DELETE_SIGN__").asInt()).isZero();
+ }
+ }
+
+ private Map<Integer, String> namesById(List<String> records) throws
Exception {
+ Map<Integer, String> result = new HashMap<>();
+ for (String record : records) {
+ JsonNode node = MAPPER.readTree(record);
+ result.put(node.get("id").asInt(), node.get("UserName").asText());
+ }
+ return result;
+ }
+
+ private void insert(String table, int id, String name) throws Exception {
+ try (Connection conn = rootConnection(database);
+ Statement st = conn.createStatement()) {
+ st.execute(String.format("INSERT INTO %s VALUES (%d, '%s')",
table, id, name));
+ }
+ }
+
+ private Connection rootConnection(String db) throws Exception {
+ String url =
+ "jdbc:mysql://"
+ + MYSQL.getHost()
+ + ":"
+ + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+ + "/"
+ + db;
+ return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
new file mode 100644
index 00000000000..620a3792944
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Verify that the cdc_client read path handles PostgreSQL's identifier
case-folding correctly.
+ *
+ * <p>PostgreSQL folds unquoted identifiers to lowercase (per SQL standard).
Quoted identifiers
+ * preserve the declared case. The cdc_client must use the stored name in its
config to match
+ * what Debezium returns — which is what PG actually stores.
+ */
+@Testcontainers
+class PostgresCaseFoldingITCase {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final AtomicLong JOB_ID_SEQ = new AtomicLong(520_000);
+
+ @Container
+ static final PostgreSQLContainer<?> POSTGRES =
+ new PostgreSQLContainer<>(
+ DockerImageName.parse("postgres:14")
+ .asCompatibleSubstituteFor("postgres"))
+ .withCommand("postgres", "-c", "wal_level=logical");
+
+ private String jobId;
+
+ @BeforeEach
+ void setUp() {
+ jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+ }
+
+ @AfterEach
+ void tearDown() {
+ Env.getCurrentEnv().close(jobId);
+ }
+
+ @Test
+ void unquotedTableNameIsFoldedToLowercase() throws Exception {
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS T_Users");
+ // Unquoted identifier: PG folds to lowercase t_users.
+ st.execute(
+ "CREATE TABLE T_Users (id INT PRIMARY KEY, name
VARCHAR(50))");
+ st.execute("INSERT INTO T_Users VALUES (1,'Alice'), (2,'Bob')");
+ }
+
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.postgres(
+ jobId,
+ POSTGRES.getHost(),
+
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword(),
+ POSTGRES.getDatabaseName(),
+ "public",
+ "t_users",
+ "initial")) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("t_users");
+ SnapshotResult snapshot = harness.readSnapshot(splits);
+
+ Map<Integer, String> names = namesById(snapshot.records());
+ assertThat(names).containsOnlyKeys(1, 2);
+ assertThat(names.get(1)).isEqualTo("Alice");
+ assertThat(names.get(2)).isEqualTo("Bob");
+
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("INSERT INTO T_Users VALUES (3,'Carol')");
+ }
+
+ List<String> binlog =
+ harness.readBinlogUntil(snapshot, splits, 1,
Duration.ofSeconds(60));
+ assertThat(binlog).hasSize(1);
+ JsonNode row = MAPPER.readTree(binlog.get(0));
+ assertThat(row.get("id").asInt()).isEqualTo(3);
+ assertThat(row.get("name").asText()).isEqualTo("Carol");
+ } finally {
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS t_users");
+ }
+ }
+ }
+
+ @Test
+ void quotedTableNamePreservesGivenCase() throws Exception {
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS \"T_Users\"");
+ // Quoted identifier: PG preserves T_Users exactly.
+ st.execute(
+ "CREATE TABLE \"T_Users\" (id INT PRIMARY KEY, name
VARCHAR(50))");
+ st.execute("INSERT INTO \"T_Users\" VALUES (1,'Alice'),
(2,'Bob')");
+ }
+
+ try (CdcClientReadHarness harness =
+ CdcClientReadHarness.postgres(
+ jobId,
+ POSTGRES.getHost(),
+
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword(),
+ POSTGRES.getDatabaseName(),
+ "public",
+ "T_Users",
+ "initial")) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("T_Users");
+ SnapshotResult snapshot = harness.readSnapshot(splits);
+
+ Map<Integer, String> names = namesById(snapshot.records());
+ assertThat(names).containsOnlyKeys(1, 2);
+ assertThat(names.get(1)).isEqualTo("Alice");
+ assertThat(names.get(2)).isEqualTo("Bob");
+
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("INSERT INTO \"T_Users\" VALUES (3,'Carol')");
+ }
+
+ List<String> binlog =
+ harness.readBinlogUntil(snapshot, splits, 1,
Duration.ofSeconds(60));
+ assertThat(binlog).hasSize(1);
+ JsonNode row = MAPPER.readTree(binlog.get(0));
+ assertThat(row.get("id").asInt()).isEqualTo(3);
+ assertThat(row.get("name").asText()).isEqualTo("Carol");
+ } finally {
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("DROP TABLE IF EXISTS \"T_Users\"");
+ }
+ }
+ }
+
+ private Map<Integer, String> namesById(List<String> records) throws
Exception {
+ Map<Integer, String> result = new HashMap<>();
+ for (String record : records) {
+ JsonNode node = MAPPER.readTree(record);
+ result.put(node.get("id").asInt(), node.get("name").asText());
+ }
+ return result;
+ }
+
+ private Connection connect() throws Exception {
+ return DriverManager.getConnection(
+ POSTGRES.getJdbcUrl(),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index cf70b6b075c..c1a80da6a22 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -270,6 +270,21 @@ class ConfigUtilTest {
assertEquals(Set.of("c"), all.get("t2"));
}
+ @Test
+ void parseAllExcludeColumns_preservesOriginalCase() {
+ // Table name in config key is preserved as-is; callers must use the
+ // same case that Debezium returns (which is the PG/MySQL stored name).
+ Map<String, String> config = new HashMap<>();
+ config.put("table.MyTable.exclude_columns", "a,b");
+ config.put("table.T_User.exclude_columns", "c");
+
+ Map<String, Set<String>> all =
ConfigUtil.parseAllExcludeColumns(config);
+
+ assertEquals(2, all.size());
+ assertEquals(Set.of("a", "b"), all.get("MyTable"));
+ assertEquals(Set.of("c"), all.get("T_User"));
+ }
+
@Test
void parseAllTargetTableMappings_skipsEmptyValues() {
Map<String, String> config = new HashMap<>();
@@ -282,6 +297,31 @@ class ConfigUtilTest {
assertEquals("dst", mappings.get("src"));
}
+ @Test
+ void parseAllTargetTableMappings_preservesOriginalCase() {
+ Map<String, String> config = new HashMap<>();
+ config.put("table.MyTable.target_table", "DstTable");
+ config.put("table.Src.target_table", "Dst2");
+
+ Map<String, String> mappings =
ConfigUtil.parseAllTargetTableMappings(config);
+
+ assertEquals(2, mappings.size());
+ assertEquals("DstTable", mappings.get("MyTable"));
+ assertEquals("Dst2", mappings.get("Src"));
+ }
+
+ // ─── getTableList case handling
───────────────────────────────────────────
+
+ @Test
+ void tableListPreservesOriginalCase() {
+ // include_tables values are passed through as-is; Flink CDC's (?i)
regex
+ // handles case-insensitive matching downstream.
+ Map<String, String> config = new HashMap<>();
+ config.put(DataSourceConfigKeys.INCLUDE_TABLES, "MyTable, T_User");
+ String[] result = ConfigUtil.getTableList("db", config);
+ assertArrayEquals(new String[] {"db.MyTable", "db.T_User"}, result);
+ }
+
// ─── toStringMap
──────────────────────────────────────────────────────────
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]