This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ce88ef41c9d [test](cdc) Add case sensitivity ITCases for MySQL  and PG 
(#65033)
ce88ef41c9d is described below

commit ce88ef41c9dfdfdc8f0a60977ff2dba165565ab5
Author: wudi <[email protected]>
AuthorDate: Thu Jul 2 10:26:11 2026 +0800

    [test](cdc) Add case sensitivity ITCases for MySQL  and PG (#65033)
    
    Add integration tests to cover table identifier case handling:
    
    - **MySqlCaseSensitiveITCase**: MySQL 8.0 with default
    lower_case_table_names=0, mixed-case table name T_Users
    - **PostgresCaseFoldingITCase**: PG unquoted identifier folding + quoted
    identifier preservation
    - **ConfigUtilTest**: verify
    getTableList/parseAllExcludeColumns/parseAllTargetTableMappings preserve
    original case
---
 .github/workflows/build-extension.yml              |   2 +-
 .../cdcclient/itcase/MySqlCaseSensitiveITCase.java | 158 +++++++++++++++++
 .../itcase/PostgresCaseFoldingITCase.java          | 192 +++++++++++++++++++++
 .../doris/cdcclient/utils/ConfigUtilTest.java      |  40 +++++
 4 files changed, 391 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/build-extension.yml 
b/.github/workflows/build-extension.yml
index 735ce2beeba..dd14e02cd83 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -147,7 +147,7 @@ jobs:
 
       - name: Run cdc client integration tests
         run: |
-          cd fe && mvn -Pflatten install -pl fe-common -Dskip.doc=true 
-DskipTests -Dmaven.build.cache.enabled=false
+          cd fe && mvn -Pflatten install -pl fe-common -am -Dskip.doc=true 
-DskipTests -Dmaven.build.cache.enabled=false
           cd ../fs_brokers/cdc_client && mvn -B test failsafe:integration-test 
failsafe:verify
   # build-docs:
   #   name: Build Documents
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
new file mode 100644
index 00000000000..58e65fd219d
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlCaseSensitiveITCase.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Verify that the cdc_client read path works with mixed-case table names when
+ * {@code lower_case_table_names=0} (the default on Linux).
+ *
+ * <p>MySQL 8.0 on Linux stores table names in their original case under this 
mode. Debezium
+ * returns the stored name as-is, so the cdc_client cache layers 
(exclude-columns, target-table
+ * mappings) must match the configured name exactly.
+ */
+@Testcontainers
+class MySqlCaseSensitiveITCase {
+
+    private static final String USER = "cdc";
+    private static final String PASSWORD = "123456";
+    private static final String ROOT_USER = "root";
+    private static final String ROOT_PASSWORD = "123456";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(500_000);
+
+    @Container
+    static final MySQLContainer<?> MYSQL =
+            new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                    .withConfigurationOverride("docker/server-with-binlog")
+                    .withDatabaseName("cdc_test")
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+    private String jobId;
+    private String database;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        database = "case_db_" + jobId;
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("CREATE DATABASE " + database);
+            st.execute("USE " + database);
+            st.execute(
+                    "CREATE TABLE T_Users (id INT NOT NULL, UserName 
VARCHAR(50), PRIMARY KEY (id))");
+            st.execute("INSERT INTO T_Users VALUES (1,'Alice'), (2,'Bob'), 
(3,'Carol')");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("DROP DATABASE IF EXISTS " + database);
+        }
+    }
+
+    @Test
+    void readsSnapshotThenBinlogWithMixedCaseTableName() throws Exception {
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.mysql(
+                        jobId,
+                        MYSQL.getHost(),
+                        MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+                        ROOT_USER,
+                        ROOT_PASSWORD,
+                        database,
+                        "T_Users",
+                        "initial")) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("T_Users");
+            SnapshotResult snapshot = harness.readSnapshot(splits);
+
+            Map<Integer, String> names = namesById(snapshot.records());
+            assertThat(names).containsOnlyKeys(1, 2, 3);
+            assertThat(names.get(1)).isEqualTo("Alice");
+            assertThat(names.get(2)).isEqualTo("Bob");
+            assertThat(names.get(3)).isEqualTo("Carol");
+
+            insert("T_Users", 4, "Dave");
+
+            List<String> binlog =
+                    harness.readBinlogUntil(snapshot, splits, 1, 
Duration.ofSeconds(60));
+            assertThat(binlog).hasSize(1);
+            JsonNode row = MAPPER.readTree(binlog.get(0));
+            assertThat(row.get("id").asInt()).isEqualTo(4);
+            assertThat(row.get("UserName").asText()).isEqualTo("Dave");
+            assertThat(row.get("__DORIS_DELETE_SIGN__").asInt()).isZero();
+        }
+    }
+
+    private Map<Integer, String> namesById(List<String> records) throws 
Exception {
+        Map<Integer, String> result = new HashMap<>();
+        for (String record : records) {
+            JsonNode node = MAPPER.readTree(record);
+            result.put(node.get("id").asInt(), node.get("UserName").asText());
+        }
+        return result;
+    }
+
+    private void insert(String table, int id, String name) throws Exception {
+        try (Connection conn = rootConnection(database);
+                Statement st = conn.createStatement()) {
+            st.execute(String.format("INSERT INTO %s VALUES (%d, '%s')", 
table, id, name));
+        }
+    }
+
+    private Connection rootConnection(String db) throws Exception {
+        String url =
+                "jdbc:mysql://"
+                        + MYSQL.getHost()
+                        + ":"
+                        + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+                        + "/"
+                        + db;
+        return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
new file mode 100644
index 00000000000..620a3792944
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresCaseFoldingITCase.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Verify that the cdc_client read path handles PostgreSQL's identifier 
case-folding correctly.
+ *
+ * <p>PostgreSQL folds unquoted identifiers to lowercase (per SQL standard). 
Quoted identifiers
+ * preserve the declared case. The cdc_client must use the stored name in its 
config to match
+ * what Debezium returns — which is what PG actually stores.
+ */
+@Testcontainers
+class PostgresCaseFoldingITCase {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(520_000);
+
+    @Container
+    static final PostgreSQLContainer<?> POSTGRES =
+            new PostgreSQLContainer<>(
+                            DockerImageName.parse("postgres:14")
+                                    .asCompatibleSubstituteFor("postgres"))
+                    .withCommand("postgres", "-c", "wal_level=logical");
+
+    private String jobId;
+
+    @BeforeEach
+    void setUp() {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+    }
+
+    @AfterEach
+    void tearDown() {
+        Env.getCurrentEnv().close(jobId);
+    }
+
+    @Test
+    void unquotedTableNameIsFoldedToLowercase() throws Exception {
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS T_Users");
+            // Unquoted identifier: PG folds to lowercase t_users.
+            st.execute(
+                    "CREATE TABLE T_Users (id INT PRIMARY KEY, name 
VARCHAR(50))");
+            st.execute("INSERT INTO T_Users VALUES (1,'Alice'), (2,'Bob')");
+        }
+
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.postgres(
+                        jobId,
+                        POSTGRES.getHost(),
+                        
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+                        POSTGRES.getUsername(),
+                        POSTGRES.getPassword(),
+                        POSTGRES.getDatabaseName(),
+                        "public",
+                        "t_users",
+                        "initial")) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("t_users");
+            SnapshotResult snapshot = harness.readSnapshot(splits);
+
+            Map<Integer, String> names = namesById(snapshot.records());
+            assertThat(names).containsOnlyKeys(1, 2);
+            assertThat(names.get(1)).isEqualTo("Alice");
+            assertThat(names.get(2)).isEqualTo("Bob");
+
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("INSERT INTO T_Users VALUES (3,'Carol')");
+            }
+
+            List<String> binlog =
+                    harness.readBinlogUntil(snapshot, splits, 1, 
Duration.ofSeconds(60));
+            assertThat(binlog).hasSize(1);
+            JsonNode row = MAPPER.readTree(binlog.get(0));
+            assertThat(row.get("id").asInt()).isEqualTo(3);
+            assertThat(row.get("name").asText()).isEqualTo("Carol");
+        } finally {
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("DROP TABLE IF EXISTS t_users");
+            }
+        }
+    }
+
+    @Test
+    void quotedTableNamePreservesGivenCase() throws Exception {
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS \"T_Users\"");
+            // Quoted identifier: PG preserves T_Users exactly.
+            st.execute(
+                    "CREATE TABLE \"T_Users\" (id INT PRIMARY KEY, name 
VARCHAR(50))");
+            st.execute("INSERT INTO \"T_Users\" VALUES (1,'Alice'), 
(2,'Bob')");
+        }
+
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.postgres(
+                        jobId,
+                        POSTGRES.getHost(),
+                        
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+                        POSTGRES.getUsername(),
+                        POSTGRES.getPassword(),
+                        POSTGRES.getDatabaseName(),
+                        "public",
+                        "T_Users",
+                        "initial")) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("T_Users");
+            SnapshotResult snapshot = harness.readSnapshot(splits);
+
+            Map<Integer, String> names = namesById(snapshot.records());
+            assertThat(names).containsOnlyKeys(1, 2);
+            assertThat(names.get(1)).isEqualTo("Alice");
+            assertThat(names.get(2)).isEqualTo("Bob");
+
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("INSERT INTO \"T_Users\" VALUES (3,'Carol')");
+            }
+
+            List<String> binlog =
+                    harness.readBinlogUntil(snapshot, splits, 1, 
Duration.ofSeconds(60));
+            assertThat(binlog).hasSize(1);
+            JsonNode row = MAPPER.readTree(binlog.get(0));
+            assertThat(row.get("id").asInt()).isEqualTo(3);
+            assertThat(row.get("name").asText()).isEqualTo("Carol");
+        } finally {
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("DROP TABLE IF EXISTS \"T_Users\"");
+            }
+        }
+    }
+
+    private Map<Integer, String> namesById(List<String> records) throws 
Exception {
+        Map<Integer, String> result = new HashMap<>();
+        for (String record : records) {
+            JsonNode node = MAPPER.readTree(record);
+            result.put(node.get("id").asInt(), node.get("name").asText());
+        }
+        return result;
+    }
+
+    private Connection connect() throws Exception {
+        return DriverManager.getConnection(
+                POSTGRES.getJdbcUrl(),
+                POSTGRES.getUsername(),
+                POSTGRES.getPassword());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index cf70b6b075c..c1a80da6a22 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -270,6 +270,21 @@ class ConfigUtilTest {
         assertEquals(Set.of("c"), all.get("t2"));
     }
 
+    @Test
+    void parseAllExcludeColumns_preservesOriginalCase() {
+        // Table name in config key is preserved as-is; callers must use the
+        // same case that Debezium returns (which is the PG/MySQL stored name).
+        Map<String, String> config = new HashMap<>();
+        config.put("table.MyTable.exclude_columns", "a,b");
+        config.put("table.T_User.exclude_columns", "c");
+
+        Map<String, Set<String>> all = 
ConfigUtil.parseAllExcludeColumns(config);
+
+        assertEquals(2, all.size());
+        assertEquals(Set.of("a", "b"), all.get("MyTable"));
+        assertEquals(Set.of("c"), all.get("T_User"));
+    }
+
     @Test
     void parseAllTargetTableMappings_skipsEmptyValues() {
         Map<String, String> config = new HashMap<>();
@@ -282,6 +297,31 @@ class ConfigUtilTest {
         assertEquals("dst", mappings.get("src"));
     }
 
+    @Test
+    void parseAllTargetTableMappings_preservesOriginalCase() {
+        Map<String, String> config = new HashMap<>();
+        config.put("table.MyTable.target_table", "DstTable");
+        config.put("table.Src.target_table", "Dst2");
+
+        Map<String, String> mappings = 
ConfigUtil.parseAllTargetTableMappings(config);
+
+        assertEquals(2, mappings.size());
+        assertEquals("DstTable", mappings.get("MyTable"));
+        assertEquals("Dst2", mappings.get("Src"));
+    }
+
+    // ─── getTableList case handling 
───────────────────────────────────────────
+
+    @Test
+    void tableListPreservesOriginalCase() {
+        // include_tables values are passed through as-is; Flink CDC's (?i) 
regex
+        // handles case-insensitive matching downstream.
+        Map<String, String> config = new HashMap<>();
+        config.put(DataSourceConfigKeys.INCLUDE_TABLES, "MyTable, T_User");
+        String[] result = ConfigUtil.getTableList("db", config);
+        assertArrayEquals(new String[] {"db.MyTable", "db.T_User"}, result);
+    }
+
     // ─── toStringMap 
──────────────────────────────────────────────────────────
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to