This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 066d71fbbde [Improve](Streaming Job) Supports PG databases with SSL 
enabled (#60988)
066d71fbbde is described below

commit 066d71fbbde6585315eb2562e146231895087f01
Author: wudi <[email protected]>
AuthorDate: Thu Mar 5 10:12:38 2026 +0800

    [Improve](Streaming Job) Supports PG databases with SSL enabled (#60988)
    
    ### What problem does this PR solve?
    
    Enable SSL verification when syncing PG.
    
    ```sql
    CREATE JOB test_streaming_postgres_job_name_ssl
    ON STREAMING
    FROM POSTGRES (
        "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
        "driver_url" = "postgresql-42.5.0.jar",
        "driver_class" = "org.postgresql.Driver",
        "user" = "postgres",
        "password" = "123456",
        "database" = "postgres",
        "schema" = "cdc_test",
        "include_tables" = "user_info_pg_normal1_ssl",
        "offset" = "initial",
        "ssl_mode" = "verify-ca",
        "ssl_rootcert" = "FILE:ca.pem"
    )
    TO DATABASE regression_test_job_p0_streaming_job_cdc (
      "table.create.properties.replication_num" = "1"
    )
    ```
---
 .licenserc.yaml                                    |   1 +
 be/src/runtime/cdc_client_mgr.cpp                  |   4 +-
 be/test/runtime/cdc_client_mgr_test.cpp            |  10 +
 .../docker-compose/postgresql/certs/root.crt       |  19 ++
 .../docker-compose/postgresql/certs/server.crt     |  18 +
 .../docker-compose/postgresql/certs/server.key     |  28 ++
 .../postgresql/postgresql-14.yaml.tpl              |  19 ++
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   2 +
 .../job/cdc/request/CompareOffsetRequest.java      |   3 +-
 .../job/cdc/request/FetchTableSplitsRequest.java   |   4 +-
 .../doris/job/cdc/request/JobBaseConfig.java       |   1 +
 .../doris/job/cdc/request/WriteRecordRequest.java  |   1 -
 .../streaming/DataSourceConfigValidator.java       |   4 +-
 .../insert/streaming/StreamingInsertJob.java       |  13 +-
 .../insert/streaming/StreamingMultiTblTask.java    |   9 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  20 +-
 .../apache/doris/job/util/StreamingJobUtils.java   |  30 ++
 fs_brokers/cdc_client/pom.xml                      |   7 +
 .../org/apache/doris/cdcclient/common/Env.java     |   6 +
 ...ndPortHolder.java => SystemEnvInitializer.java} |   6 +-
 .../reader/postgres/PostgresSourceReader.java      |  20 +-
 .../apache/doris/cdcclient/utils/SmallFileMgr.java | 224 ++++++++++++
 .../src/main/resources/application.properties      |   4 +-
 .../doris/cdcclient/utils/SmallFileMgrTest.java    | 378 +++++++++++++++++++++
 .../cdc/test_streaming_postgres_job_ssl.out        |   9 +
 .../cdc/test_streaming_mysql_job_priv.groovy       |   2 +-
 .../cdc/test_streaming_postgres_job_ssl.groovy     | 157 +++++++++
 27 files changed, 975 insertions(+), 24 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index e902d1a6eb5..5fd7e766a2c 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -90,6 +90,7 @@ header:
     - "docker/thirdparties/docker-compose/kerberos/paimon_data/**"
     - "docker/thirdparties/docker-compose/kerberos/sql/**"
     - "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
+    - "docker/thirdparties/docker-compose/postgresql/certs/**"
     - "conf/mysql_ssl_default_certificate/*"
     - "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
     - "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
diff --git a/be/src/runtime/cdc_client_mgr.cpp 
b/be/src/runtime/cdc_client_mgr.cpp
index 96864ee8c6d..6600ab846ae 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -42,6 +42,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "http/http_client.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 
@@ -159,6 +160,7 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
             "--server.port=" + std::to_string(doris::config::cdc_client_port);
     const std::string backend_http_port =
             "--backend.http.port=" + std::to_string(config::webserver_port);
+    const std::string cluster_token = "--cluster.token=" + 
ExecEnv::GetInstance()->token();
     const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
 
     // check cdc jar exists
@@ -215,7 +217,7 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
 
         // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 
--backend.http.port=8040
         execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", 
cdc_jar_path.c_str(),
-               cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
+               cdc_jar_port.c_str(), backend_http_port.c_str(), 
cluster_token.c_str(), (char*)NULL);
         // If execlp returns, it means it failed
         perror("Cdc client child process error");
         exit(1);
diff --git a/be/test/runtime/cdc_client_mgr_test.cpp 
b/be/test/runtime/cdc_client_mgr_test.cpp
index 4e9f880d18e..3ab05c394d7 100644
--- a/be/test/runtime/cdc_client_mgr_test.cpp
+++ b/be/test/runtime/cdc_client_mgr_test.cpp
@@ -32,6 +32,8 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "runtime/cluster_info.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 
@@ -66,6 +68,11 @@ public:
             setenv("LOG_DIR", _log_dir.c_str(), 1);
             _log_dir_set = true;
         }
+
+        auto* env = ExecEnv::GetInstance();
+        _cluster_info = std::make_unique<ClusterInfo>();
+        _cluster_info->token = "test_token";
+        env->set_cluster_info(_cluster_info.get());
     }
 
     void TearDown() override {
@@ -92,6 +99,8 @@ public:
         if (_jar_created && !_jar_path.empty()) {
             [[maybe_unused]] int cleanup_ret = system(("rm -f " + 
_jar_path).c_str());
         }
+
+        ExecEnv::GetInstance()->set_cluster_info(nullptr);
     }
 
 protected:
@@ -104,6 +113,7 @@ protected:
     const char* _original_java_home = nullptr;
     bool _jar_created = false;
     bool _log_dir_set = false;
+    std::unique_ptr<ClusterInfo> _cluster_info;
 };
 
 // Test stop method when there's no child process
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/root.crt 
b/docker/thirdparties/docker-compose/postgresql/certs/root.crt
new file mode 100644
index 00000000000..94095b762c6
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/root.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2
+MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq
+V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ
+w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO
+fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4
+tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1
+bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd
+BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP
+D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
+AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN
+WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3
+dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB
+zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M
+JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka
+cx+pSJLima+3GHhK2Rj437yx1Q==
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/server.crt 
b/docker/thirdparties/docker-compose/postgresql/certs/server.crt
new file mode 100644
index 00000000000..e8aaecb71ec
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/server.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2
+MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ
+Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu
+k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4
+CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l
+yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR
+iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV
+HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1
+xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK
+aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt
+mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF
+iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl
+pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX
+Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI=
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/server.key 
b/docker/thirdparties/docker-compose/postgresql/certs/server.key
new file mode 100644
index 00000000000..d9eb2a8f4ad
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
+i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
+INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
+qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
+rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
+lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
+ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
+vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
+KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
+XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
+qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
+KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
+QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
+99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
+k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
+I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
+Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
+C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
+/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
+vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
+bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
+ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
+HZde9n1uVLVtlBRTjjL5O84=
+-----END PRIVATE KEY-----
diff --git 
a/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl 
b/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
index 6d8873817a5..1012a65b1f2 100644
--- a/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
+++ b/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
@@ -25,6 +25,16 @@ services:
       POSTGRES_PASSWORD: 123456
     ports:
       - ${DOCKER_PG_14_EXTERNAL_PORT}:5432
+    entrypoint:
+      - bash
+      - -c
+      - |
+        chmod 600 /var/lib/postgresql/certs/server.key
+        chmod 644 /var/lib/postgresql/certs/server.crt
+        chmod 644 /var/lib/postgresql/certs/root.crt
+        chown postgres:postgres /var/lib/postgresql/certs/*
+        exec docker-entrypoint.sh "$@"
+      - --      
     command:
       - "postgres"
       - "-c"
@@ -33,6 +43,14 @@ services:
       - "max_wal_senders=30"
       - "-c"
       - "max_replication_slots=30"
+      - "-c"
+      - "ssl=on"
+      - "-c"
+      - "ssl_cert_file=/var/lib/postgresql/certs/server.crt"
+      - "-c"
+      - "ssl_key_file=/var/lib/postgresql/certs/server.key"
+      - "-c"
+      - "ssl_ca_file=/var/lib/postgresql/certs/root.crt"
     healthcheck:
       test: [ "CMD-SHELL", "pg_isready -U postgres && psql -U postgres -c 
'SELECT 1 FROM doris_test.deadline;'" ]
       interval: 5s
@@ -41,6 +59,7 @@ services:
     volumes:
       - ./data/data/:/var/lib/postgresql/data/
       - ./init:/docker-entrypoint-initdb.d
+      - ./certs:/var/lib/postgresql/certs
     networks:
       - doris--postgres
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 17c5d7d575e..77ce8b2bf51 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,6 +35,8 @@ public class DataSourceConfigKeys {
     public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+    public static final String SSL_MODE = "ssl_mode";
+    public static final String SSL_ROOTCERT = "ssl_rootcert";
 
     // target properties
     public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
index 8449afd9628..1a57cbdefe3 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
@@ -35,9 +35,10 @@ public class CompareOffsetRequest extends JobBaseConfig {
     public CompareOffsetRequest(Long jobId,
             String sourceType,
             Map<String, String> sourceProperties,
+            String frontendAddress,
             Map<String, String> offsetFirst,
             Map<String, String> offsetSecond) {
-        super(jobId, sourceType, sourceProperties);
+        super(jobId, sourceType, sourceProperties, frontendAddress);
         this.offsetFirst = offsetFirst;
         this.offsetSecond = offsetSecond;
     }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
index f855e373958..5c3cf62ab4c 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
@@ -34,8 +34,8 @@ public class FetchTableSplitsRequest extends JobBaseConfig {
     private String snapshotTable;
 
     public FetchTableSplitsRequest(Long jobId, String name,
-            Map<String, String> sourceProperties, String snapshotTable) {
-        super(jobId, name, sourceProperties);
+            Map<String, String> sourceProperties, String frontendAddress, 
String snapshotTable) {
+        super(jobId, name, sourceProperties, frontendAddress);
         this.snapshotTable = snapshotTable;
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
index bfdbf6a3455..c7b60026d88 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
@@ -30,4 +30,5 @@ public class JobBaseConfig {
     private Long jobId;
     private String dataSource;
     private Map<String, String> config;
+    private String frontendAddress;
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index 195125b1c66..fd56518643d 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -28,7 +28,6 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
     private long maxInterval;
     private String targetDb;
     private String token;
-    private String frontendAddress;
     private String taskId;
     private Map<String, String> streamLoadProps;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index cb7ec530fcf..f3d9e016950 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -38,7 +38,9 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.INCLUDE_TABLES,
             DataSourceConfigKeys.EXCLUDE_TABLES,
             DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
-            DataSourceConfigKeys.SNAPSHOT_PARALLELISM
+            DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+            DataSourceConfigKeys.SSL_MODE,
+            DataSourceConfigKeys.SSL_ROOTCERT
     );
 
     public static void validateSource(Map<String, String> input) throws 
IllegalArgumentException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c820b8d532a..189656fa48d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -102,6 +102,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 @Log4j2
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
         TxnStateChangeCallback, GsonPostProcessable {
+    public static final String JOB_FILE_CATALOG = "streaming_job";
     private long dbId;
     // Streaming job statistics, all persisted in txn attachment
     private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@@ -224,7 +225,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 String includeTables = String.join(",", createTbls);
                 sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, 
includeTables);
             }
-            this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType, sourceProperties);
+            this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType,
+                    StreamingJobUtils.convertCertFile(getDbId(), 
sourceProperties));
             JdbcSourceOffsetProvider rdsOffsetProvider = 
(JdbcSourceOffsetProvider) this.offsetProvider;
             rdsOffsetProvider.splitChunks(createTbls);
         } catch (Exception ex) {
@@ -459,7 +461,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return newTasks;
     }
 
-    protected AbstractStreamingTask createStreamingTask() {
+    protected AbstractStreamingTask createStreamingTask() throws JobException {
         if (tvfType != null) {
             this.runningStreamTask = createStreamingInsertTask();
         } else {
@@ -477,9 +479,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
      * for From MySQL TO Database
      * @return
      */
-    private AbstractStreamingTask createStreamingMultiTblTask() {
+    private AbstractStreamingTask createStreamingMultiTblTask() throws 
JobException {
+        Map<String, String> convertSourceProps = 
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
         return new StreamingMultiTblTask(getJobId(), 
Env.getCurrentEnv().getNextId(), dataSourceType,
-                offsetProvider, sourceProperties, targetDb, targetProperties, 
jobProperties, getCreateUser());
+                offsetProvider, convertSourceProps, targetDb, 
targetProperties, jobProperties, getCreateUser());
     }
 
     protected AbstractStreamingTask createStreamingInsertTask() {
@@ -599,7 +602,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         updateJobStatus(JobStatus.PAUSED);
     }
 
-    public void onStreamTaskSuccess(AbstractStreamingTask task) {
+    public void onStreamTaskSuccess(AbstractStreamingTask task) throws 
JobException {
         try {
             resetFailureInfo(null);
             succeedTaskCount.incrementAndGet();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index b6a4e8c939b..cf9a9b905be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -61,6 +61,10 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+/**
+ * In PostgreSQL/MySQL, multi-table writes are performed by tasks that only 
make calls.
+ * The write logic resides in the `cdc_client` and is implemented via 
`stream_load`.
+ */
 @Log4j2
 @Getter
 public class StreamingMultiTblTask extends AbstractStreamingTask {
@@ -81,7 +85,6 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     public StreamingMultiTblTask(Long jobId,
             long taskId,
             DataSourceType dataSourceType,
-
             SourceOffsetProvider offsetProvider,
             Map<String, String> sourceProperties,
             String targetDb,
@@ -180,8 +183,8 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         WriteRecordRequest request = new WriteRecordRequest();
         request.setJobId(getJobId());
         request.setConfig(sourceProperties);
-        request.setDataSource(dataSourceType.name());
 
+        request.setDataSource(dataSourceType.name());
         request.setTaskId(getTaskId() + "");
         request.setToken(getToken());
         request.setTargetDb(targetDb);
@@ -228,7 +231,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     /**
      * Callback function for offset commit success.
      */
-    public void successCallback(CommitOffsetRequest offsetRequest) {
+    public void successCallback(CommitOffsetRequest offsetRequest) throws 
JobException {
         if (getIsCanceled().get()) {
             return;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index b2f0d02e538..d04245317b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.job.offset.jdbc;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.httpv2.entity.ResponseBody;
 import org.apache.doris.httpv2.rest.RestApiStatusCode;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
@@ -192,7 +193,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     @Override
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
         Backend backend = StreamingJobUtils.selectBackend();
-        JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
+        JobBaseConfig requestParams =
+                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/fetchEndOffset")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -273,7 +275,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
             throws JobException {
         Backend backend = StreamingJobUtils.selectBackend();
         CompareOffsetRequest requestParams =
-                new CompareOffsetRequest(getJobId(), sourceType.name(), 
sourceProperties, offsetFirst, offsetSecond);
+                new CompareOffsetRequest(getJobId(), sourceType.name(), 
sourceProperties,
+                        getFrontendAddress(), offsetFirst, offsetSecond);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/compareOffset")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -486,7 +489,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     private List<SnapshotSplit> requestTableSplits(String table) throws 
JobException {
         Backend backend = StreamingJobUtils.selectBackend();
         FetchTableSplitsRequest requestParams =
-                new FetchTableSplitsRequest(getJobId(), sourceType.name(), 
sourceProperties, table);
+                new FetchTableSplitsRequest(getJobId(), sourceType.name(),
+                        sourceProperties, getFrontendAddress(), table);
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/fetchSplits")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -537,7 +541,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
      */
     private void initSourceReader() throws JobException {
         Backend backend = StreamingJobUtils.selectBackend();
-        JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
+        JobBaseConfig requestParams =
+                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/initReader")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -584,7 +589,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
         Backend backend = StreamingJobUtils.selectBackend();
-        JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
+        JobBaseConfig requestParams =
+                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/close")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -602,4 +608,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
             log.warn("Close job error: ", ex);
         }
     }
+
+    private String getFrontendAddress() {
+        return Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 1a1fb68fe82..7299f0a9b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -25,7 +25,10 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.SmallFileMgr;
+import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@@ -33,6 +36,7 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.split.SnapshotSplit;
 import org.apache.doris.job.common.DataSourceType;
 import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
@@ -245,6 +249,32 @@ public class StreamingJobUtils {
         return index;
     }
 
+    /**
+     * When enabling SSL, you need to convert FILE:ca.pem to FILE:ca.pem:md5.
+     */
+    public static Map<String, String> convertCertFile(long dbId, Map<String, 
String> sourceProperties)
+            throws JobException {
+        SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+        Map<String, String> newProps = new HashMap<>(sourceProperties);
+        if (sourceProperties.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+            String certFile = 
sourceProperties.get(DataSourceConfigKeys.SSL_ROOTCERT);
+            if (certFile.startsWith("FILE:")) {
+                String file = certFile.substring(certFile.indexOf(":") + 1);
+                try {
+                    SmallFile smallFile =
+                            smallFileMgr.getSmallFile(dbId, 
StreamingInsertJob.JOB_FILE_CATALOG, file, true);
+                    newProps.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:" + 
smallFile.id + ":" + smallFile.md5);
+                } catch (DdlException ex) {
+                    throw new JobException("ssl root cert file not found: " + 
certFile, ex);
+                }
+            } else {
+                throw new JobException("ssl root cert is not in expected 
format, "
+                        + "should start with FILE:, got " + certFile);
+            }
+        }
+        return newProps;
+    }
+
     public static List<CreateTableCommand> generateCreateTableCmds(String 
targetDb, DataSourceType sourceType,
             Map<String, String> properties, Map<String, String> 
targetProperties)
             throws JobException {
diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index cac45bdc7a1..afa186844b7 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -178,6 +178,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>5.10.2</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index ff4056a8b5b..332a4766002 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
+import lombok.Getter;
 import lombok.Setter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ public class Env {
     private final Map<Long, JobContext> jobContexts;
     private final Map<Long, Lock> jobLocks;
     @Setter private int backendHttpPort;
+    @Setter @Getter private String clusterToken;
+    @Setter @Getter private volatile String feMasterAddress;
 
     private Env() {
         this.jobContexts = new ConcurrentHashMap<>();
@@ -62,6 +65,9 @@ public class Env {
     }
 
     public SourceReader getReader(JobBaseConfig jobConfig) {
+        if (jobConfig.getFrontendAddress() != null && 
!jobConfig.getFrontendAddress().isEmpty()) {
+            this.feMasterAddress = jobConfig.getFrontendAddress();
+        }
         DataSource ds = resolveDataSource(jobConfig.getDataSource());
         Env manager = Env.getCurrentEnv();
         return manager.getOrCreateReader(jobConfig.getJobId(), ds, 
jobConfig.getConfig());
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
similarity index 88%
rename from 
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
rename to 
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
index c1977134530..11102b1db08 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
@@ -24,13 +24,17 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 @Component
-public class BackendPortHolder {
+public class SystemEnvInitializer {
 
     @Value("${backend.http.port}")
     private int port;
 
+    @Value("${cluster.token}")
+    private String clusterToken;
+
     @PostConstruct
     public void init() {
         Env.getCurrentEnv().setBackendHttpPort(port);
+        Env.getCurrentEnv().setClusterToken(clusterToken);
     }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 5a9fa095941..a0bb57ad120 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -22,6 +22,7 @@ import 
org.apache.doris.cdcclient.exception.CdcClientException;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.cdcclient.utils.SmallFileMgr;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
 public class PostgresSourceReader extends JdbcIncrementalSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final Object SLOT_CREATION_LOCK = new Object();
 
     public PostgresSourceReader() {
         super();
@@ -88,8 +90,10 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
     public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
         PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
-        LOG.info("Creating slot for job {}, user {}", jobId, 
sourceConfig.getUsername());
-        createSlotForGlobalStreamSplit(dialect);
+        synchronized (SLOT_CREATION_LOCK) {
+            LOG.info("Creating slot for job {}, user {}", jobId, 
sourceConfig.getUsername());
+            createSlotForGlobalStreamSplit(dialect);
+        }
         super.initialize(jobId, dataSource, config);
     }
 
@@ -215,6 +219,18 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         dbzProps.put("interval.handling.mode", "string");
         configFactory.debeziumProperties(dbzProps);
 
+        // setting ssl
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) {
+            dbzProps.put("database.sslmode", 
cdcConfig.get(DataSourceConfigKeys.SSL_MODE));
+        }
+
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+            String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT);
+            String filePath = SmallFileMgr.getFilePath(fileName);
+            LOG.info("Using SSL root cert file path: {}", filePath);
+            dbzProps.put("database.sslrootcert", filePath);
+        }
+
         configFactory.serverTimeZone(
                 
ConfigUtil.getPostgresServerTimeZoneFromProps(props).toString());
         configFactory.slotName(getSlotName(jobId));
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
new file mode 100644
index 00000000000..1e854097883
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {
+            // Double-check memory cache inside the lock
+            String doubleChecked = MEM_CACHE.get(cacheKey);
+            if (doubleChecked != null) {
+                LOG.debug("SmallFile memory cache hit (after lock): {}", 
doubleChecked);
+                return doubleChecked;
+            }
+
+            String finalFilePath = localDir + File.separator + fileId + "." + 
md5;
+            File finalFile = new File(finalFilePath);
+
+            // 3. Disk cache hit — avoid downloading again after process 
restart
+            if (finalFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(finalFile)) {
+                    String diskMd5 = DigestUtils.md5Hex(fis);
+                    if (diskMd5.equalsIgnoreCase(md5)) {
+                        LOG.info("SmallFile disk cache hit: {}", 
finalFilePath);
+                        MEM_CACHE.put(cacheKey, finalFilePath);
+                        return finalFilePath;
+                    }
+                    LOG.warn(
+                            "SmallFile disk cache MD5 mismatch, 
re-downloading: {}", finalFilePath);
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to read disk cached file, re-downloading: 
{}",
+                            finalFilePath,
+                            e);
+                }
+                finalFile.delete();
+            }
+
+            // 4. Download from FE: GET 
/api/get_small_file?file_id=xxx&token=yyy
+            String url =
+                    "http://";
+                            + feMasterAddress
+                            + "/api/get_small_file?file_id="
+                            + fileId
+                            + "&token="
+                            + clusterToken;
+            LOG.info("Downloading small file from FE: {}", url);
+
+            File tmpFile = new File(localDir + File.separator + fileId + 
".tmp");
+            try (CloseableHttpClient client = HttpUtil.getHttpClient();
+                    CloseableHttpResponse response = client.execute(new 
HttpGet(url))) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode != 200) {
+                    throw new RuntimeException(
+                            "Failed to download small file, status=" + 
statusCode + ", url=" + url);
+                }
+                try (InputStream in = response.getEntity().getContent();
+                        FileOutputStream fos = new FileOutputStream(tmpFile)) {
+                    byte[] buf = new byte[8192];
+                    int n;
+                    while ((n = in.read(buf)) != -1) {
+                        fos.write(buf, 0, n);
+                    }
+                }
+            } catch (IOException e) {
+                tmpFile.delete();
+                throw new RuntimeException("Failed to download small file from 
FE: " + url, e);
+            }
+
+            // 5. Verify MD5 of downloaded content
+            try (FileInputStream fis = new FileInputStream(tmpFile)) {
+                String downloadedMd5 = DigestUtils.md5Hex(fis);
+                if (!downloadedMd5.equalsIgnoreCase(md5)) {
+                    tmpFile.delete();
+                    throw new RuntimeException(
+                            "Small file MD5 mismatch, expected=" + md5 + ", 
got=" + downloadedMd5);
+                }
+            } catch (IOException e) {
+                tmpFile.delete();
+                throw new RuntimeException("Failed to verify downloaded file 
MD5", e);
+            }
+
+            // 6. Atomically promote tmp file to final path
+            if (!tmpFile.renameTo(finalFile)) {
+                tmpFile.delete();
+                throw new RuntimeException("Failed to rename tmp file to: " + 
finalFilePath);
+            }
+
+            LOG.info("Small file ready at: {}", finalFilePath);
+            MEM_CACHE.put(cacheKey, finalFilePath);
+            return finalFilePath;
+        }
+    }
+
+    /** Clears the in-memory cache. Exposed for testing. */
+    static void clearCache() {
+        MEM_CACHE.clear();
+        DOWNLOAD_LOCKS.clear();
+    }
+}
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties 
b/fs_brokers/cdc_client/src/main/resources/application.properties
index 9a7ee6c3f76..a7ea3996a6f 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -17,4 +17,6 @@
 
################################################################################
 spring.web.resources.add-mappings=false
 server.port=9096
-backend.http.port=8040
\ No newline at end of file
+backend.http.port=8040
+# see doris-meta/image/VERSION
+cluster.token=cluster-token
\ No newline at end of file
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
new file mode 100644
index 00000000000..ae99d4db986
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SmallFileMgrTest {
+
+    @TempDir
+    Path tempDir;
+
+    @BeforeEach
+    void setUp() {
+        SmallFileMgr.clearCache();
+    }
+
+    @AfterEach
+    void tearDown() {
+        SmallFileMgr.clearCache();
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Input validation
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testInvalidPrefix() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> SmallFileMgr.getFilePath(
+                        "host:8030", "NOFILE:123:abc", "token", 
tempDir.toString()));
+    }
+
+    @Test
+    void testInvalidFormatMissingMd5() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> SmallFileMgr.getFilePath(
+                        "host:8030", "FILE:123", "token", tempDir.toString()));
+    }
+
+    @Test
+    void testInvalidFormatTooManyParts() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> SmallFileMgr.getFilePath(
+                        "host:8030", "FILE:123:abc:extra", "token", 
tempDir.toString()));
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Disk cache hit
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testDiskCacheHit() throws Exception {
+        byte[] content = "-----BEGIN CERTIFICATE-----\ntest\n-----END 
CERTIFICATE-----".getBytes();
+        String md5 = DigestUtils.md5Hex(content);
+        String fileId = "10001";
+
+        // Pre-populate disk cache
+        File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+        Files.write(cachedFile.toPath(), content);
+
+        String result = SmallFileMgr.getFilePath(
+                "host:8030", "FILE:" + fileId + ":" + md5, "token", 
tempDir.toString());
+
+        assertEquals(cachedFile.getAbsolutePath(), result);
+    }
+
+    @Test
+    void testDiskCacheMd5MismatchTriggersRedownload() throws Exception {
+        byte[] correctContent = "correct cert content".getBytes();
+        String correctMd5 = DigestUtils.md5Hex(correctContent);
+        String fileId = "10002";
+
+        // Write a file with the correct name but corrupted content
+        File corruptFile = tempDir.resolve(fileId + "." + correctMd5).toFile();
+        Files.write(corruptFile.toPath(), "corrupted content".getBytes());
+
+        // Start a mock FE HTTP server that serves the correct content
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    exchange.sendResponseHeaders(200, correctContent.length);
+                    exchange.getResponseBody().write(correctContent);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        try {
+            String result = SmallFileMgr.getFilePath(
+                    "127.0.0.1:" + port,
+                    "FILE:" + fileId + ":" + correctMd5,
+                    "test-token",
+                    tempDir.toString());
+
+            assertEquals(corruptFile.getAbsolutePath(), result);
+            // Verify the file now contains the correct content
+            byte[] written = Files.readAllBytes(corruptFile.toPath());
+            assertEquals(correctMd5, DigestUtils.md5Hex(written));
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // In-memory cache hit
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testMemoryCacheHitSkipsDiskIo() throws Exception {
+        byte[] content = "ssl cert data".getBytes();
+        String md5 = DigestUtils.md5Hex(content);
+        String fileId = "10003";
+
+        // Populate memory cache via a disk-cache hit on the first call
+        File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+        Files.write(cachedFile.toPath(), content);
+        SmallFileMgr.getFilePath(
+                "host:8030", "FILE:" + fileId + ":" + md5, "token", 
tempDir.toString());
+
+        // Remove disk file so any disk/HTTP access would fail
+        assertTrue(cachedFile.delete());
+
+        // Second call must still succeed via memory cache
+        String result = SmallFileMgr.getFilePath(
+                "host:8030", "FILE:" + fileId + ":" + md5, "token", 
tempDir.toString());
+
+        assertEquals(cachedFile.getAbsolutePath(), result);
+    }
+
+    // 
-------------------------------------------------------------------------
+    // HTTP download from FE
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testDownloadSuccess() throws Exception {
+        byte[] content = "-----BEGIN CERTIFICATE-----\ndata\n-----END 
CERTIFICATE-----".getBytes();
+        String md5 = DigestUtils.md5Hex(content);
+        String fileId = "20001";
+
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    // Verify query params are forwarded correctly
+                    String query = exchange.getRequestURI().getQuery();
+                    assertTrue(query.contains("file_id=" + fileId));
+                    assertTrue(query.contains("token=test-token"));
+
+                    exchange.sendResponseHeaders(200, content.length);
+                    exchange.getResponseBody().write(content);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        try {
+            String result = SmallFileMgr.getFilePath(
+                    "127.0.0.1:" + port,
+                    "FILE:" + fileId + ":" + md5,
+                    "test-token",
+                    tempDir.toString());
+
+            File expectedFile = tempDir.resolve(fileId + "." + md5).toFile();
+            assertEquals(expectedFile.getAbsolutePath(), result);
+            assertTrue(expectedFile.exists());
+
+            // Verify file content
+            byte[] written = Files.readAllBytes(expectedFile.toPath());
+            assertEquals(md5, DigestUtils.md5Hex(written));
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    @Test
+    void testDownloadSecondCallUsesMemoryCache() throws Exception {
+        byte[] content = "cert content".getBytes();
+        String md5 = DigestUtils.md5Hex(content);
+        String fileId = "20002";
+
+        int[] callCount = {0};
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    callCount[0]++;
+                    exchange.sendResponseHeaders(200, content.length);
+                    exchange.getResponseBody().write(content);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        try {
+            String address = "127.0.0.1:" + port;
+            String filePath = "FILE:" + fileId + ":" + md5;
+
+            SmallFileMgr.getFilePath(address, filePath, "token", 
tempDir.toString());
+            SmallFileMgr.getFilePath(address, filePath, "token", 
tempDir.toString());
+
+            assertEquals(1, callCount[0], "FE should only be contacted once");
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    @Test
+    void testDownloadMd5Mismatch() throws Exception {
+        byte[] expectedContent = "expected content".getBytes();
+        String expectedMd5 = DigestUtils.md5Hex(expectedContent);
+        String fileId = "20003";
+
+        byte[] wrongContent = "totally different content".getBytes();
+
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    exchange.sendResponseHeaders(200, wrongContent.length);
+                    exchange.getResponseBody().write(wrongContent);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        try {
+            RuntimeException ex = assertThrows(
+                    RuntimeException.class,
+                    () -> SmallFileMgr.getFilePath(
+                            "127.0.0.1:" + port,
+                            "FILE:" + fileId + ":" + expectedMd5,
+                            "test-token",
+                            tempDir.toString()));
+            assertTrue(ex.getMessage().contains("MD5 mismatch"));
+
+            // Tmp file must be cleaned up after failure
+            File tmpFile = tempDir.resolve(fileId + ".tmp").toFile();
+            assertTrue(!tmpFile.exists(), "tmp file should be deleted after 
MD5 mismatch");
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    @Test
+    void testDownloadHttpError() throws Exception {
+        String fileId = "20004";
+        String md5 = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4";
+
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    exchange.sendResponseHeaders(500, -1);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        try {
+            RuntimeException ex = assertThrows(
+                    RuntimeException.class,
+                    () -> SmallFileMgr.getFilePath(
+                            "127.0.0.1:" + port,
+                            "FILE:" + fileId + ":" + md5,
+                            "test-token",
+                            tempDir.toString()));
+            assertTrue(ex.getMessage().contains("status=500"));
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Concurrency
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Verifies that N threads concurrently requesting the same file result in 
exactly one FE
+     * download and all threads receive the correct file path.
+     */
+    @Test
+    void testConcurrentDownloadSameFileSingleFetch() throws Exception {
+        byte[] content = "concurrent ssl cert content".getBytes();
+        String md5 = DigestUtils.md5Hex(content);
+        String fileId = "30001";
+        int threadCount = 8;
+
+        AtomicInteger callCount = new AtomicInteger(0);
+        // Simulate a slow FE download so threads have a chance to race
+        HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+        int port = server.getAddress().getPort();
+        server.createContext(
+                "/api/get_small_file",
+                exchange -> {
+                    callCount.incrementAndGet();
+                    try {
+                        Thread.sleep(20);
+                    } catch (InterruptedException ignored) {
+                    }
+                    exchange.sendResponseHeaders(200, content.length);
+                    exchange.getResponseBody().write(content);
+                    exchange.getResponseBody().close();
+                });
+        server.start();
+
+        ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+        CountDownLatch startGate = new CountDownLatch(1);
+        String address = "127.0.0.1:" + port;
+        String filePath = "FILE:" + fileId + ":" + md5;
+        String expectedPath = tempDir.resolve(fileId + "." + 
md5).toFile().getAbsolutePath();
+
+        try {
+            List<Future<String>> futures = new ArrayList<>();
+            for (int i = 0; i < threadCount; i++) {
+                futures.add(pool.submit(() -> {
+                    startGate.await();
+                    return SmallFileMgr.getFilePath(
+                            address, filePath, "token", tempDir.toString());
+                }));
+            }
+
+            startGate.countDown(); // release all threads simultaneously
+
+            for (Future<String> future : futures) {
+                assertEquals(expectedPath, future.get());
+            }
+
+            assertEquals(1, callCount.get(), "FE should be contacted exactly 
once");
+
+            // Verify the final file is intact
+            byte[] written = Files.readAllBytes(tempDir.resolve(fileId + "." + 
md5));
+            assertEquals(md5, DigestUtils.md5Hex(written));
+        } finally {
+            pool.shutdown();
+            server.stop(0);
+        }
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
new file mode 100644
index 00000000000..93141f92358
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_table1 --
+A1     1
+B1     2
+
+-- !select_binlog_table1 --
+B1     10
+Doris  18
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index d16bc57e73e..4909e3d0469 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -180,7 +180,7 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
                        def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                        log.info("jobStatus: " + jobStatus)
                        // check job status
-                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
+                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0)
                    }
            )
        } catch (Exception ex){
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
new file mode 100644
index 00000000000..20a854cae64
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_ssl", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_name_ssl"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_normal1_ssl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            def sslInfo = sql """SHOW ssl"""
+            log.info("sslInfo: " + sslInfo)
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2);"""
+        }
+
+        try {
+            sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog" 
= "streaming_job")"""
+        } catch (Exception ignored) {
+            // ignore
+        }
+
+        sql """CREATE FILE "ca.pem"
+            IN ${currentDb}
+            PROPERTIES
+            (
+                "url" = 
"https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt";,
+                "catalog" = "streaming_job"
+            )
+        """
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}", 
+                    "offset" = "initial",
+                    "ssl_mode" = "verify-ca",
+                    "ssl_rootcert" = "FILE:ca.pem"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+        def showAllTables = sql """ show tables from ${currentDb}"""
+        log.info("showAllTables: " + showAllTables)
+        // check table created
+        def showTables = sql """ show tables from ${currentDb} like 
'${table1}'; """
+        assert showTables.size() == 1
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        // check job status and succeed task count larger than 
2
+                        jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        // check snapshot data
+        qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
+
+        // mock incremental into
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES 
('Doris',18);"""
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE name = 'Doris'; """
+            log.info("xminResult: " + xminResult)
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE 
name = 'B1';"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name = 
'A1';"""
+        }
+
+        sleep(60000); // wait for cdc incremental data
+
+        // check incremental data
+        qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc 
"""
+
+        def jobInfo = sql """
+        select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 5
+        assert loadStat.loadBytes == 246
+        assert jobInfo.get(0).get(1) == "RUNNING"
+
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        try {
+            sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog" 
= "streaming_job")"""
+        } catch (Exception ignored) {
+            // ignore
+        }
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to