This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new c13c8b920e8 branch-4.1: [improve](streaming-job) support SSL and align 
MySQL CDC source with PG #62700 (#63044)
c13c8b920e8 is described below

commit c13c8b920e8a925f8fa70a3320f89948d4a259d2
Author: wudi <[email protected]>
AuthorDate: Thu May 7 14:47:49 2026 +0800

    branch-4.1: [improve](streaming-job) support SSL and align MySQL CDC source 
with PG #62700 (#63044)
    
    Cherry-picked from #62700
---
 .licenserc.yaml                                    |   1 +
 .../docker-compose/mysql/certs/root.crt            |  19 +++
 .../docker-compose/mysql/certs/server.crt          |  18 ++
 .../docker-compose/mysql/certs/server.key          |  28 +++
 .../docker-compose/mysql/mysql-5.7.yaml.tpl        |  19 ++-
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   4 +
 .../streaming/DataSourceConfigValidator.java       |  19 +++
 .../streaming/DataSourceConfigValidatorTest.java   |  81 +++++++++
 .../source/reader/mysql/MySqlSourceReader.java     |  48 ++++++
 .../apache/doris/cdcclient/utils/SmallFileMgr.java |  93 ++++++++++
 .../source/reader/mysql/MySqlSourceReaderTest.java |  62 +++++++
 .../doris/cdcclient/utils/SmallFileMgrTest.java    | 119 +++++++++++++
 .../cdc/test_streaming_mysql_job_col_filter.out    |   9 +
 .../cdc/test_streaming_mysql_job_ssl.out           |   9 +
 .../cdc/test_streaming_mysql_job_table_mapping.out |  19 +++
 .../cdc/test_streaming_mysql_job_col_filter.groovy | 175 +++++++++++++++++++
 .../cdc/test_streaming_mysql_job_ssl.groovy        | 158 +++++++++++++++++
 .../test_streaming_mysql_job_table_mapping.groovy  | 190 +++++++++++++++++++++
 18 files changed, 1069 insertions(+), 2 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 5cd7c30afa8..5bdfa221eb5 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -95,6 +95,7 @@ header:
     - "docker/thirdparties/docker-compose/kerberos/sql/**"
     - "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
     - "docker/thirdparties/docker-compose/postgresql/certs/**"
+    - "docker/thirdparties/docker-compose/mysql/certs/**"
     - "conf/mysql_ssl_default_certificate/*"
     - "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
     - "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
diff --git a/docker/thirdparties/docker-compose/mysql/certs/root.crt 
b/docker/thirdparties/docker-compose/mysql/certs/root.crt
new file mode 100644
index 00000000000..94095b762c6
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/root.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2
+MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq
+V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ
+w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO
+fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4
+tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1
+bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd
+BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP
+D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
+AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN
+WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3
+dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB
+zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M
+JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka
+cx+pSJLima+3GHhK2Rj437yx1Q==
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.crt 
b/docker/thirdparties/docker-compose/mysql/certs/server.crt
new file mode 100644
index 00000000000..e8aaecb71ec
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/server.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2
+MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ
+Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu
+k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4
+CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l
+yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR
+iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV
+HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1
+xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK
+aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt
+mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF
+iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl
+pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX
+Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI=
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.key 
b/docker/thirdparties/docker-compose/mysql/certs/server.key
new file mode 100644
index 00000000000..d9eb2a8f4ad
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
+i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
+INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
+qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
+rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
+lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
+ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
+vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
+KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
+XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
+qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
+KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
+QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
+99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
+k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
+I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
+Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
+C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
+/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
+vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
+bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
+ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
+HZde9n1uVLVtlBRTjjL5O84=
+-----END PRIVATE KEY-----
diff --git a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl 
b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
index 3ceeaa313e1..9fe7b1a38ef 100644
--- a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
+++ b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
@@ -20,7 +20,6 @@ version: "2.1"
 services:
   doris--mysql_57:
     image: mysql:5.7.36
-    command: --default-authentication-plugin=mysql_native_password
     restart: always
     environment:
       MYSQL_ROOT_PASSWORD: 123456
@@ -29,15 +28,31 @@ services:
       LANG: C.UTF-8
     ports:
       - ${DOCKER_MYSQL_57_EXTERNAL_PORT}:3306
+    entrypoint:
+      - bash
+      - -c
+      - |
+        chown mysql:mysql /etc/mysql/certs/*
+        chmod 600 /etc/mysql/certs/server.key
+        chmod 644 /etc/mysql/certs/server.crt /etc/mysql/certs/root.crt
+        exec docker-entrypoint.sh "$@"
+      - --
+    command:
+      - "mysqld"
+      - "--default-authentication-plugin=mysql_native_password"
+      - "--ssl-ca=/etc/mysql/certs/root.crt"
+      - "--ssl-cert=/etc/mysql/certs/server.crt"
+      - "--ssl-key=/etc/mysql/certs/server.key"
     healthcheck:
       test: mysqladmin ping -h 127.0.0.1 -u root 
--password=$$MYSQL_ROOT_PASSWORD && mysql -h 127.0.0.1 -u root 
--password=$$MYSQL_ROOT_PASSWORD -e "SELECT 1 FROM doris_test.deadline;"
       interval: 5s
       timeout: 60s
       retries: 120
     volumes:
-      - ./data/:/var/lib/mysql 
+      - ./data/:/var/lib/mysql
       - ./init:/docker-entrypoint-initdb.d
       - ./my.cnf:/etc/mysql/conf.d/my.cnf
+      - ./certs:/etc/mysql/certs
     networks:
       - doris--mysql_57
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index d31794766ad..72322da2668 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -40,6 +40,10 @@ public class DataSourceConfigKeys {
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
     public static final String SSL_MODE = "ssl_mode";
     public static final String SSL_ROOTCERT = "ssl_rootcert";
+    // PG-style spelling; MySQL normalizes to underscore form.
+    public static final String SSL_MODE_DISABLE = "disable";
+    public static final String SSL_MODE_REQUIRE = "require";
+    public static final String SSL_MODE_VERIFY_CA = "verify-ca";
 
     // PostgreSQL replication slot and publication config
     public static final String SLOT_NAME = "slot_name";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index c4aad00d33a..f37cbbff5dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -56,6 +56,12 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.PUBLICATION_NAME
     );
 
+    private static final Set<String> ALLOW_SSL_MODES = Sets.newHashSet(
+            DataSourceConfigKeys.SSL_MODE_DISABLE,
+            DataSourceConfigKeys.SSL_MODE_REQUIRE,
+            DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+    );
+
     // Known suffixes for per-table config keys (format: 
"table.<tableName>.<suffix>")
     private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = 
Sets.newHashSet(
             DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
@@ -102,6 +108,16 @@ public class DataSourceConfigValidator {
                 throw new IllegalArgumentException("Invalid value for key '" + 
key + "': " + value);
             }
         }
+
+        // Cross-field: verify-ca must be paired with a CA cert; otherwise the 
reader will
+        // silently fall back to the JVM default truststore and likely fail to 
connect.
+        if 
(DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE))
+                && (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null
+                        || 
input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) {
+            throw new IllegalArgumentException(
+                    "ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+                            + "' requires ssl_rootcert to be set");
+        }
     }
 
     public static void validateTarget(Map<String, String> input) throws 
IllegalArgumentException {
@@ -144,6 +160,9 @@ public class DataSourceConfigValidator {
             return value.length() <= PG_MAX_IDENTIFIER_LENGTH
                     && PG_IDENTIFIER_PATTERN.matcher(value).matches();
         }
+        if (key.equals(DataSourceConfigKeys.SSL_MODE) && 
!ALLOW_SSL_MODES.contains(value)) {
+            return false;
+        }
         return true;
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index 2f71b7664a8..ce570f440a8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -30,6 +30,87 @@ public class DataSourceConfigValidatorTest {
 
     private static final int PG_MAX_IDENTIFIER_LENGTH = 63;
 
+    private static Map<String, String> sslModeInput(String value) {
+        Map<String, String> input = new HashMap<>();
+        input.put(DataSourceConfigKeys.SSL_MODE, value);
+        return input;
+    }
+
+    @Test
+    public void testSslModeLegalValues() {
+        DataSourceConfigValidator.validateSource(
+                sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), 
DataSourceType.MYSQL.name());
+        DataSourceConfigValidator.validateSource(
+                sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), 
DataSourceType.MYSQL.name());
+        // verify-ca additionally requires ssl_rootcert; covered by 
testVerifyCaWithRootcertPasses.
+    }
+
+    @Test
+    public void testSslModeRejectsMysqlUnderscoreSpelling() {
+        assertReject(sslModeInput("verify_ca"));
+    }
+
+    @Test
+    public void testSslModeRejectsVerifyFull() {
+        assertReject(sslModeInput("verify-full"));
+    }
+
+    @Test
+    public void testSslModeRejectsPreferredAndAllow() {
+        assertReject(sslModeInput("preferred"));
+        assertReject(sslModeInput("prefer"));
+        assertReject(sslModeInput("allow"));
+    }
+
+    @Test
+    public void testSslModeRejectsUppercaseVariants() {
+        assertReject(sslModeInput("DISABLE"));
+        assertReject(sslModeInput("Verify-CA"));
+    }
+
+    @Test
+    public void testSslModeRejectsEmpty() {
+        assertReject(sslModeInput(""));
+    }
+
+    @Test
+    public void testSslModeOptional() {
+        // ssl_mode is not required; validateSource should pass when absent
+        Map<String, String> input = new HashMap<>();
+        input.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+        DataSourceConfigValidator.validateSource(input, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testVerifyCaRequiresRootcert() {
+        Map<String, String> input = 
sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
+        assertReject(input);
+    }
+
+    @Test
+    public void testVerifyCaWithRootcertPasses() {
+        Map<String, String> input = 
sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
+        input.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:ca.pem");
+        DataSourceConfigValidator.validateSource(input, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testDisableWithoutRootcertPasses() {
+        DataSourceConfigValidator.validateSource(
+                sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE), 
DataSourceType.MYSQL.name());
+        DataSourceConfigValidator.validateSource(
+                sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE), 
DataSourceType.MYSQL.name());
+    }
+
+    private static void assertReject(Map<String, String> input) {
+        try {
+            DataSourceConfigValidator.validateSource(input, 
DataSourceType.MYSQL.name());
+            Assert.fail("expected IllegalArgumentException for input: " + 
input);
+        } catch (IllegalArgumentException ignored) {
+            // expected
+        }
+    }
+
     @Test
     public void testSlotNameAndPublicationNameAllowed() {
         Map<String, String> props = new HashMap<>();
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 9aa268ef09b..bf8ac56312b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -25,6 +25,7 @@ import 
org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.source.reader.SplitRecords;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.cdcclient.utils.SmallFileMgr;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -878,6 +879,28 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         dbzProps.setProperty(
                 MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(),
                 DEBEZIUM_HEARTBEAT_INTERVAL_MS + "");
+
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) {
+            String normalized =
+                    
normalizeSslModeForMysql(cdcConfig.get(DataSourceConfigKeys.SSL_MODE));
+            dbzProps.put("database.ssl.mode", normalized);
+            // Flink CDC's forked MySqlConnection drops Debezium SSL props 
from the snapshot
+            // JDBC URL, so mirror to Connector/J native names.
+            jdbcProperteis.put("sslMode", normalized);
+        }
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+            String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT);
+            String truststorePath = 
SmallFileMgr.getPkcs12TruststorePath(fileName);
+            LOG.info("Using SSL truststore file path: {}", truststorePath);
+            dbzProps.put("database.ssl.truststore", truststorePath);
+            dbzProps.put("database.ssl.truststore.password", 
SmallFileMgr.TRUSTSTORE_PASSWORD);
+            jdbcProperteis.put("trustCertificateKeyStoreUrl", "file:" + 
truststorePath);
+            // Connector/J defaults keystore type to JKS; we generate PKCS12.
+            jdbcProperteis.put("trustCertificateKeyStoreType", "PKCS12");
+            jdbcProperteis.put(
+                    "trustCertificateKeyStorePassword", 
SmallFileMgr.TRUSTSTORE_PASSWORD);
+        }
+
         configFactory.debeziumProperties(dbzProps);
         
configFactory.heartbeatInterval(Duration.ofMillis(DEBEZIUM_HEARTBEAT_INTERVAL_MS));
 
@@ -1056,6 +1079,31 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         return serializer.deserialize(config, element);
     }
 
+    /** Map Doris ssl_mode (PG-style) to Debezium MySQL's underscore spelling. 
*/
+    static String normalizeSslModeForMysql(String sslMode) {
+        if (sslMode == null) {
+            throw new IllegalArgumentException("ssl_mode must not be null");
+        }
+        switch (sslMode) {
+            case DataSourceConfigKeys.SSL_MODE_DISABLE:
+                return 
MySqlConnectorConfig.SecureConnectionMode.DISABLED.getValue();
+            case DataSourceConfigKeys.SSL_MODE_REQUIRE:
+                return 
MySqlConnectorConfig.SecureConnectionMode.REQUIRED.getValue();
+            case DataSourceConfigKeys.SSL_MODE_VERIFY_CA:
+                return 
MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA.getValue();
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported ssl_mode for MySQL: '"
+                                + sslMode
+                                + "'. Allowed: "
+                                + String.join(
+                                        ", ",
+                                        DataSourceConfigKeys.SSL_MODE_DISABLE,
+                                        DataSourceConfigKeys.SSL_MODE_REQUIRE,
+                                        
DataSourceConfigKeys.SSL_MODE_VERIFY_CA));
+        }
+    }
+
     /**
      * Filtered record iterator that only returns data change records, 
filtering out watermark,
      * heartbeat and other events. This is a private static inner class that 
encapsulates record
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
index 1e854097883..6f9616cadb7 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
@@ -29,7 +29,15 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -47,9 +55,17 @@ public class SmallFileMgr {
 
     private static final String FILE_PREFIX = "FILE:";
 
+    private static final String PKCS12_SUFFIX = ".p12";
+
+    /** JCA-required placeholder; a public-CA-only truststore has no secret to 
protect. */
+    public static final String TRUSTSTORE_PASSWORD = "changeit";
+
     /** In-memory cache: "file_id:md5" -> absolute local file path */
     private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
 
+    /** In-memory cache for PKCS12 truststores derived from PEM CA certs. */
+    private static final Map<String, String> PKCS12_CACHE = new 
ConcurrentHashMap<>();
+
     /**
      * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
      * corruption when multiple threads race on the same file_id:md5 key.
@@ -216,9 +232,86 @@ public class SmallFileMgr {
         }
     }
 
+    /**
+     * Resolve a FILE: reference to a PKCS12 truststore path, converting the 
PEM on first access.
+     * For connectors (e.g. Debezium MySQL) that require JKS/PKCS12 rather 
than raw PEM.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local path to the PKCS12 truststore
+     */
+    public static String getPkcs12TruststorePath(String filePath) {
+        return pkcs12TruststorePath(getFilePath(filePath));
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getPkcs12TruststorePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        return pkcs12TruststorePath(getFilePath(feMasterAddress, filePath, 
clusterToken, localDir));
+    }
+
+    private static String pkcs12TruststorePath(String pemPath) {
+        String cached = PKCS12_CACHE.get(pemPath);
+        if (cached != null && new File(cached).exists()) {
+            return cached;
+        }
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(pemPath + PKCS12_SUFFIX, 
k -> new Object());
+        synchronized (lock) {
+            String doubleChecked = PKCS12_CACHE.get(pemPath);
+            if (doubleChecked != null && new File(doubleChecked).exists()) {
+                return doubleChecked;
+            }
+            String p12Path = pemPath + PKCS12_SUFFIX;
+            if (!new File(p12Path).exists()) {
+                convertPemToPkcs12(pemPath, p12Path);
+            }
+            PKCS12_CACHE.put(pemPath, p12Path);
+            return p12Path;
+        }
+    }
+
+    private static void convertPemToPkcs12(String pemPath, String p12Path) {
+        Path tmpFile;
+        try {
+            Path p12 = Paths.get(p12Path);
+            tmpFile = Files.createTempFile(p12.getParent(), "p12-", ".tmp");
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create tmp file for PKCS12 
truststore", e);
+        }
+        try {
+            KeyStore keyStore = KeyStore.getInstance("PKCS12");
+            keyStore.load(null);
+            CertificateFactory cf = CertificateFactory.getInstance("X.509");
+            try (InputStream in = new FileInputStream(pemPath)) {
+                // A CA PEM may contain a chain (intermediate + root); import 
each with a
+                // distinct alias, otherwise later entries overwrite earlier 
ones.
+                int i = 0;
+                for (Certificate cert : cf.generateCertificates(in)) {
+                    keyStore.setCertificateEntry("ca" + (i++), cert);
+                }
+            }
+            try (OutputStream os = Files.newOutputStream(tmpFile)) {
+                keyStore.store(os, TRUSTSTORE_PASSWORD.toCharArray());
+            }
+            Files.move(
+                    tmpFile,
+                    Paths.get(p12Path),
+                    StandardCopyOption.ATOMIC_MOVE,
+                    StandardCopyOption.REPLACE_EXISTING);
+            LOG.info("Generated PKCS12 truststore: {}", p12Path);
+        } catch (Exception e) {
+            try {
+                Files.deleteIfExists(tmpFile);
+            } catch (IOException ignored) {
+                // best effort
+            }
+            throw new RuntimeException("Failed to convert PEM to PKCS12: " + 
pemPath, e);
+        }
+    }
+
     /** Clears the in-memory cache. Exposed for testing. */
     static void clearCache() {
         MEM_CACHE.clear();
+        PKCS12_CACHE.clear();
         DOWNLOAD_LOCKS.clear();
     }
 }
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
new file mode 100644
index 00000000000..1192df291d3
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+public class MySqlSourceReaderTest {
+
+    @Test
+    void testNormalizeSslModeMapsAllLegalValues() {
+        assertEquals("disabled", 
MySqlSourceReader.normalizeSslModeForMysql("disable"));
+        assertEquals("required", 
MySqlSourceReader.normalizeSslModeForMysql("require"));
+        assertEquals("verify_ca", 
MySqlSourceReader.normalizeSslModeForMysql("verify-ca"));
+    }
+
+    @Test
+    void testNormalizeSslModeRejectsNull() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> MySqlSourceReader.normalizeSslModeForMysql(null));
+    }
+
+    @Test
+    void testNormalizeSslModeRejectsMysqlUnderscoreSpelling() {
+        // FE validator rejects this, but guard reader-side too.
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> MySqlSourceReader.normalizeSslModeForMysql("verify_ca"));
+    }
+
+    @Test
+    void testNormalizeSslModeRejectsVerifyFull() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> 
MySqlSourceReader.normalizeSslModeForMysql("verify-full"));
+    }
+
+    @Test
+    void testNormalizeSslModeRejectsUppercase() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> MySqlSourceReader.normalizeSslModeForMysql("DISABLE"));
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
index ae99d4db986..24bb4f5e615 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
@@ -25,9 +25,13 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -37,6 +41,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -375,4 +380,118 @@ class SmallFileMgrTest {
             server.stop(0);
         }
     }
+
+    // 
-------------------------------------------------------------------------
+    // PKCS12 truststore conversion
+    // 
-------------------------------------------------------------------------
+
+    private static final String CA_PEM =
+            "-----BEGIN CERTIFICATE-----\n"
+                    + 
"MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL\n"
+                    + 
"BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2\n"
+                    + 
"MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG\n"
+                    + 
"9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq\n"
+                    + 
"V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ\n"
+                    + 
"w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO\n"
+                    + 
"fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4\n"
+                    + 
"tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1\n"
+                    + 
"bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd\n"
+                    + 
"BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP\n"
+                    + 
"D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC\n"
+                    + 
"AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN\n"
+                    + 
"WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3\n"
+                    + 
"dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB\n"
+                    + 
"zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M\n"
+                    + 
"JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka\n"
+                    + "cx+pSJLima+3GHhK2Rj437yx1Q==\n"
+                    + "-----END CERTIFICATE-----\n";
+
+    private String preloadPem(String fileId, byte[] pemBytes) throws 
IOException {
+        String md5 = DigestUtils.md5Hex(pemBytes);
+        File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+        Files.write(cachedFile.toPath(), pemBytes);
+        return "FILE:" + fileId + ":" + md5;
+    }
+
+    private KeyStore loadPkcs12(String p12Path) throws Exception {
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        try (InputStream in = Files.newInputStream(Paths.get(p12Path))) {
+            keyStore.load(in, SmallFileMgr.TRUSTSTORE_PASSWORD.toCharArray());
+        }
+        return keyStore;
+    }
+
+    @Test
+    void testPkcs12SingleCertConversion() throws Exception {
+        String filePath = preloadPem("40001", CA_PEM.getBytes());
+        String p12Path = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+
+        assertTrue(p12Path.endsWith(".p12"));
+        assertTrue(new File(p12Path).exists());
+
+        KeyStore keyStore = loadPkcs12(p12Path);
+        assertEquals(1, keyStore.size());
+        assertTrue(keyStore.containsAlias("ca0"));
+    }
+
+    /**
+     * PEM with a chain (intermediate + root) must produce one keystore entry 
per certificate.
+     * Using the same cert twice here is sufficient to prove alias uniqueness 
- without distinct
+     * aliases the second entry would silently overwrite the first.
+     */
+    @Test
+    void testPkcs12MultipleCertsPreserveAllEntries() throws Exception {
+        String chainPem = CA_PEM + CA_PEM;
+        String filePath = preloadPem("40002", chainPem.getBytes());
+        String p12Path = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+
+        KeyStore keyStore = loadPkcs12(p12Path);
+        assertEquals(2, keyStore.size(), "chain with 2 certs must produce 2 
entries");
+        assertTrue(keyStore.containsAlias("ca0"));
+        assertTrue(keyStore.containsAlias("ca1"));
+    }
+
+    @Test
+    void testPkcs12SecondCallUsesMemoryCacheWhenFilePresent() throws Exception 
{
+        String filePath = preloadPem("40003", CA_PEM.getBytes());
+        String first = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+
+        long firstMtime = new File(first).lastModified();
+        String second = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+        assertEquals(first, second);
+        assertEquals(firstMtime, new File(second).lastModified(),
+                "second call should hit memory cache and not regenerate .p12");
+    }
+
+    @Test
+    void testPkcs12RegeneratesWhenCachedFileMissing() throws Exception {
+        String filePath = preloadPem("40005", CA_PEM.getBytes());
+        String first = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+
+        // Simulate external deletion after the cache entry was stored.
+        assertTrue(new File(first).delete());
+
+        String second = SmallFileMgr.getPkcs12TruststorePath(
+                "host:8030", filePath, "token", tempDir.toString());
+        assertEquals(first, second);
+        assertTrue(new File(second).exists(),
+                "cached path whose file disappeared should be regenerated on 
next call");
+    }
+
+    @Test
+    void testPkcs12InvalidPemThrows() throws Exception {
+        byte[] invalid = ("-----BEGIN CERTIFICATE-----\n"
+                + "this-is-not-valid-base64!!!\n"
+                + "-----END CERTIFICATE-----\n").getBytes();
+        String filePath = preloadPem("40004", invalid);
+
+        assertThrows(RuntimeException.class,
+                () -> SmallFileMgr.getPkcs12TruststorePath(
+                        "host:8030", filePath, "token", tempDir.toString()));
+    }
 }
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
new file mode 100644
index 00000000000..c7ca9579309
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+A1     1
+B1     2
+
+-- !select_incremental --
+B1     20
+C1     3
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
new file mode 100644
index 00000000000..93141f92358
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_table1 --
+A1     1
+B1     2
+
+-- !select_binlog_table1 --
+B1     10
+Doris  18
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+1      Alice
+2      Bob
+
+-- !select_incremental --
+2      Bob_v2
+3      Carol
+
+-- !select_merge_snapshot --
+100    Src1_A
+200    Src2_A
+
+-- !select_merge_incremental --
+100    Src1_A
+101    Src1_B
+200    Src2_A
+201    Src2_B
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
new file mode 100644
index 00000000000..f89776cd231
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_col_filter", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_col_filter"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_mysql_col_filter"
+    def mysqlDb = "test_cdc_db_col_filter"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}_err1'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}_err2'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port    = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // Create MySQL table with an extra "secret" column to be excluded
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                      `name`   varchar(200) NOT NULL,
+                      `age`    int,
+                      `secret` varchar(200),
+                      PRIMARY KEY (`name`)
+                    ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('A1', 1, 
'secret_A1')"""
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('B1', 2, 
'secret_B1')"""
+        }
+
+        // ── Validation: exclude a non-existent column should fail 
──────────────
+        try {
+            sql """CREATE JOB ${jobName}_err1
+                    ON STREAMING
+                    FROM MYSQL (
+                        "jdbc_url"       = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                        "driver_url"     = "${driver_url}",
+                        "driver_class"   = "com.mysql.cj.jdbc.Driver",
+                        "user"           = "root",
+                        "password"       = "123456",
+                        "database"       = "${mysqlDb}",
+                        "include_tables" = "${table1}",
+                        "offset"         = "initial",
+                        "table.${table1}.exclude_columns" = "nonexistent_col"
+                    )
+                    TO DATABASE ${currentDb} (
+                        "table.create.properties.replication_num" = "1"
+                    )"""
+            assert false : "Should have thrown exception for non-existent 
excluded column"
+        } catch (Exception e) {
+            log.info("Expected error for non-existent column: " + e.message)
+            assert e.message.contains("does not exist") : "Unexpected error 
message: " + e.message
+        }
+
+        // ── Validation: exclude a PK column should fail 
────────────────────────
+        try {
+            sql """CREATE JOB ${jobName}_err2
+                    ON STREAMING
+                    FROM MYSQL (
+                        "jdbc_url"       = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                        "driver_url"     = "${driver_url}",
+                        "driver_class"   = "com.mysql.cj.jdbc.Driver",
+                        "user"           = "root",
+                        "password"       = "123456",
+                        "database"       = "${mysqlDb}",
+                        "include_tables" = "${table1}",
+                        "offset"         = "initial",
+                        "table.${table1}.exclude_columns" = "name"
+                    )
+                    TO DATABASE ${currentDb} (
+                        "table.create.properties.replication_num" = "1"
+                    )"""
+            assert false : "Should have thrown exception for excluding PK 
column"
+        } catch (Exception e) {
+            log.info("Expected error for PK column: " + e.message)
+            assert e.message.contains("primary key") : "Unexpected error 
message: " + e.message
+        }
+
+        // ── Main job: exclude "secret" column 
──────────────────────────────────
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url"       = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "com.mysql.cj.jdbc.Driver",
+                    "user"           = "root",
+                    "password"       = "123456",
+                    "database"       = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "initial",
+                    "table.${table1}.exclude_columns" = "secret"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify Doris table was created WITHOUT the excluded column
+        def colNames = (sql """desc ${currentDb}.${table1}""").collect { it[0] 
}
+        assert !colNames.contains("secret") : "Excluded column 'secret' must 
not appear in Doris table"
+        assert colNames.contains("name")
+        assert colNames.contains("age")
+
+        // Wait for snapshot to complete
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        // Snapshot: only name and age, secret absent
+        qt_select_snapshot """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+        // ── Incremental DML: secret values must not appear in Doris 
───────────
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('C1', 3, 
'secret_C1')"""
+            sql """UPDATE ${mysqlDb}.${table1} SET age = 20, secret = 
'updated_secret' WHERE name = 'B1'"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1'"""
+        }
+        // Wait until C1 appears and A1 is gone
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def names = (sql """ SELECT name FROM ${table1} ORDER BY name 
ASC """).collect { it[0] }
+                names.contains('C1') && !names.contains('A1')
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+        // Doris table still has no secret column after DML events on excluded 
column
+        def colNamesAfterDml = (sql """desc ${currentDb}.${table1}""").collect 
{ it[0] }
+        assert !colNamesAfterDml.contains("secret") : "secret column must not 
appear in Doris after DML on excluded column"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
new file mode 100644
index 00000000000..51359cb5983
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_ssl", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_name_ssl"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_mysql_normal1_ssl"
+    def mysqlDb = "test_cdc_db_ssl"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // create test
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `name` varchar(200) NOT NULL,
+                  `age` int DEFAULT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 
1);"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 
2);"""
+        }
+
+        try {
+            sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES 
("catalog" = "streaming_job")"""
+        } catch (Exception ignored) {
+            // ignore
+        }
+
+        sql """CREATE FILE "mysql_ca.pem"
+            IN ${currentDb}
+            PROPERTIES
+            (
+                "url" = 
"https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt";,
+                "catalog" = "streaming_job"
+            )
+        """
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial",
+                    "ssl_mode" = "verify-ca",
+                    "ssl_rootcert" = "FILE:mysql_ca.pem"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+        def showAllTables = sql """ show tables from ${currentDb}"""
+        log.info("showAllTables: " + showAllTables)
+        // check table created
+        def showTables = sql """ show tables from ${currentDb} like 
'${table1}'; """
+        assert showTables.size() == 1
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        // check job status and succeed task count larger than 
2
+                        jobSuccendCount.size() == 1 && 
jobSuccendCount.get(0).get(0).toString().toLong() >= 2L
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        // check snapshot data
+        qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
+
+        // mock incremental into
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${table1} (name,age) VALUES 
('Doris',18);"""
+            sql """UPDATE ${mysqlDb}.${table1} SET age = 10 WHERE name = 
'B1';"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1';"""
+        }
+
+        // wait for cdc incremental data
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def names = (sql """ SELECT name FROM ${table1} ORDER BY name 
ASC """).collect { it[0] }
+                names.contains('Doris') && !names.contains('A1')
+            })
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        // check incremental data
+        qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc 
"""
+
+        def jobInfo = sql """
+        select status from jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == "RUNNING"
+
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        try {
+            sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES 
("catalog" = "streaming_job")"""
+        } catch (Exception ignored) {
+            // ignore
+        }
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
new file mode 100644
index 00000000000..305290391e6
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_table_mapping", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName         = "test_streaming_mysql_table_mapping"
+    def jobNameMerge    = "test_streaming_mysql_table_mapping_merge"
+    def currentDb       = (sql "select database()")[0][0]
+    def mysqlSrcTable   = "mysql_src_table"       // upstream MySQL table name
+    def dorisDstTable   = "doris_dst_table_mysql" // downstream Doris table 
name (mapped)
+    def mysqlSrcTable2  = "mysql_src_table2"      // second upstream table 
(multi-table merge)
+    def dorisMergeTable = "doris_merge_table_mysql"
+    def mysqlDb         = "test_cdc_db_table_mapping"
+
+    // Cleanup
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+    sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+    sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port    = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ── Case 1: basic table name mapping 
─────────────────────────────────
+        // MySQL table: mysql_src_table → Doris table: doris_dst_table_mysql
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}"""
+            sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} (
+                      `id`   int NOT NULL,
+                      `name` varchar(200),
+                      PRIMARY KEY (`id`)
+                    ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (1, 
'Alice')"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (2, 'Bob')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url"       = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "com.mysql.cj.jdbc.Driver",
+                    "user"           = "root",
+                    "password"       = "123456",
+                    "database"       = "${mysqlDb}",
+                    "include_tables" = "${mysqlSrcTable}",
+                    "offset"         = "initial",
+                    "table.${mysqlSrcTable}.target_table" = "${dorisDstTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the Doris table was created with the mapped name, not the 
source name
+        def tables = (sql """show tables from ${currentDb}""").collect { it[0] 
}
+        assert tables.contains(dorisDstTable) : "Doris target table 
'${dorisDstTable}' should exist"
+        assert !tables.contains(mysqlSrcTable) : "Source table name 
'${mysqlSrcTable}' must NOT exist in Doris"
+
+        // Wait for snapshot
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC 
"""
+
+        // Incremental: INSERT / UPDATE / DELETE must all land in 
doris_dst_table_mysql
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (3, 
'Carol')"""
+            sql """UPDATE ${mysqlDb}.${mysqlSrcTable} SET name = 'Bob_v2' 
WHERE id = 2"""
+            sql """DELETE FROM ${mysqlDb}.${mysqlSrcTable} WHERE id = 1"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id 
ASC """).collect { it[0].toInteger() }
+                ids.contains(3) && !ids.contains(1)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id 
ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // ── Case 2: multi-table merge (two MySQL tables → one Doris table) ──
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable2}"""
+            sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} (
+                      `id`   int NOT NULL,
+                      `name` varchar(200),
+                      PRIMARY KEY (`id`)
+                    ) ENGINE=InnoDB"""
+            sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable2} (
+                      `id`   int NOT NULL,
+                      `name` varchar(200),
+                      PRIMARY KEY (`id`)
+                    ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable}  VALUES (100, 
'Src1_A')"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (200, 
'Src2_A')"""
+        }
+
+        sql """CREATE JOB ${jobNameMerge}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url"       = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "com.mysql.cj.jdbc.Driver",
+                    "user"           = "root",
+                    "password"       = "123456",
+                    "database"       = "${mysqlDb}",
+                    "include_tables" = "${mysqlSrcTable},${mysqlSrcTable2}",
+                    "offset"         = "initial",
+                    "table.${mysqlSrcTable}.target_table"  = 
"${dorisMergeTable}",
+                    "table.${mysqlSrcTable2}.target_table" = 
"${dorisMergeTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Wait for snapshot rows from both source tables
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(100) && ids.contains(200)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY 
id ASC """
+
+        // Incremental from both source tables
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable}  VALUES (101, 
'Src1_B')"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (201, 
'Src2_B')"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(101) && ids.contains(201)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER 
BY id ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+        def mergeJobCnt = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobNameMerge}'"""
+        assert mergeJobCnt.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to