This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 066d71fbbde [Improve](Streaming Job) Supports PG databases with SSL
enabled (#60988)
066d71fbbde is described below
commit 066d71fbbde6585315eb2562e146231895087f01
Author: wudi <[email protected]>
AuthorDate: Thu Mar 5 10:12:38 2026 +0800
[Improve](Streaming Job) Supports PG databases with SSL enabled (#60988)
### What problem does this PR solve?
Enable SSL verification when syncing PG.
```sql
CREATE JOB test_streaming_postgres_job_name_ssl
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "123456",
"database" = "postgres",
"schema" = "cdc_test",
"include_tables" = "user_info_pg_normal1_ssl",
"offset" = "initial",
"ssl_mode" = "verify-ca",
"ssl_rootcert" = "FILE:ca.pem"
)
TO DATABASE regression_test_job_p0_streaming_job_cdc (
"table.create.properties.replication_num" = "1"
)
```
---
.licenserc.yaml | 1 +
be/src/runtime/cdc_client_mgr.cpp | 4 +-
be/test/runtime/cdc_client_mgr_test.cpp | 10 +
.../docker-compose/postgresql/certs/root.crt | 19 ++
.../docker-compose/postgresql/certs/server.crt | 18 +
.../docker-compose/postgresql/certs/server.key | 28 ++
.../postgresql/postgresql-14.yaml.tpl | 19 ++
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 2 +
.../job/cdc/request/CompareOffsetRequest.java | 3 +-
.../job/cdc/request/FetchTableSplitsRequest.java | 4 +-
.../doris/job/cdc/request/JobBaseConfig.java | 1 +
.../doris/job/cdc/request/WriteRecordRequest.java | 1 -
.../streaming/DataSourceConfigValidator.java | 4 +-
.../insert/streaming/StreamingInsertJob.java | 13 +-
.../insert/streaming/StreamingMultiTblTask.java | 9 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 20 +-
.../apache/doris/job/util/StreamingJobUtils.java | 30 ++
fs_brokers/cdc_client/pom.xml | 7 +
.../org/apache/doris/cdcclient/common/Env.java | 6 +
...ndPortHolder.java => SystemEnvInitializer.java} | 6 +-
.../reader/postgres/PostgresSourceReader.java | 20 +-
.../apache/doris/cdcclient/utils/SmallFileMgr.java | 224 ++++++++++++
.../src/main/resources/application.properties | 4 +-
.../doris/cdcclient/utils/SmallFileMgrTest.java | 378 +++++++++++++++++++++
.../cdc/test_streaming_postgres_job_ssl.out | 9 +
.../cdc/test_streaming_mysql_job_priv.groovy | 2 +-
.../cdc/test_streaming_postgres_job_ssl.groovy | 157 +++++++++
27 files changed, 975 insertions(+), 24 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index e902d1a6eb5..5fd7e766a2c 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -90,6 +90,7 @@ header:
- "docker/thirdparties/docker-compose/kerberos/paimon_data/**"
- "docker/thirdparties/docker-compose/kerberos/sql/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
+ - "docker/thirdparties/docker-compose/postgresql/certs/**"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
- "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
diff --git a/be/src/runtime/cdc_client_mgr.cpp
b/be/src/runtime/cdc_client_mgr.cpp
index 96864ee8c6d..6600ab846ae 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -42,6 +42,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "http/http_client.h"
+#include "runtime/exec_env.h"
namespace doris {
@@ -159,6 +160,7 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
"--server.port=" + std::to_string(doris::config::cdc_client_port);
const std::string backend_http_port =
"--backend.http.port=" + std::to_string(config::webserver_port);
+ const std::string cluster_token = "--cluster.token=" +
ExecEnv::GetInstance()->token();
const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
// check cdc jar exists
@@ -215,7 +217,7 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096
--backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar",
cdc_jar_path.c_str(),
- cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
+ cdc_jar_port.c_str(), backend_http_port.c_str(),
cluster_token.c_str(), (char*)NULL);
// If execlp returns, it means it failed
perror("Cdc client child process error");
exit(1);
diff --git a/be/test/runtime/cdc_client_mgr_test.cpp
b/be/test/runtime/cdc_client_mgr_test.cpp
index 4e9f880d18e..3ab05c394d7 100644
--- a/be/test/runtime/cdc_client_mgr_test.cpp
+++ b/be/test/runtime/cdc_client_mgr_test.cpp
@@ -32,6 +32,8 @@
#include "common/config.h"
#include "common/status.h"
+#include "runtime/cluster_info.h"
+#include "runtime/exec_env.h"
namespace doris {
@@ -66,6 +68,11 @@ public:
setenv("LOG_DIR", _log_dir.c_str(), 1);
_log_dir_set = true;
}
+
+ auto* env = ExecEnv::GetInstance();
+ _cluster_info = std::make_unique<ClusterInfo>();
+ _cluster_info->token = "test_token";
+ env->set_cluster_info(_cluster_info.get());
}
void TearDown() override {
@@ -92,6 +99,8 @@ public:
if (_jar_created && !_jar_path.empty()) {
[[maybe_unused]] int cleanup_ret = system(("rm -f " +
_jar_path).c_str());
}
+
+ ExecEnv::GetInstance()->set_cluster_info(nullptr);
}
protected:
@@ -104,6 +113,7 @@ protected:
const char* _original_java_home = nullptr;
bool _jar_created = false;
bool _log_dir_set = false;
+ std::unique_ptr<ClusterInfo> _cluster_info;
};
// Test stop method when there's no child process
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/root.crt
b/docker/thirdparties/docker-compose/postgresql/certs/root.crt
new file mode 100644
index 00000000000..94095b762c6
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/root.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2
+MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq
+V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ
+w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO
+fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4
+tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1
+bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd
+BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP
+D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
+AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN
+WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3
+dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB
+zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M
+JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka
+cx+pSJLima+3GHhK2Rj437yx1Q==
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/server.crt
b/docker/thirdparties/docker-compose/postgresql/certs/server.crt
new file mode 100644
index 00000000000..e8aaecb71ec
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/server.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2
+MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ
+Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu
+k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4
+CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l
+yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR
+iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV
+HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1
+xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK
+aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt
+mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF
+iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl
+pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX
+Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI=
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/postgresql/certs/server.key
b/docker/thirdparties/docker-compose/postgresql/certs/server.key
new file mode 100644
index 00000000000..d9eb2a8f4ad
--- /dev/null
+++ b/docker/thirdparties/docker-compose/postgresql/certs/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
+i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
+INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
+qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
+rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
+lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
+ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
+vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
+KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
+XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
+qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
+KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
+QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
+99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
+k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
+I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
+Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
+C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
+/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
+vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
+bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
+ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
+HZde9n1uVLVtlBRTjjL5O84=
+-----END PRIVATE KEY-----
diff --git
a/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
b/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
index 6d8873817a5..1012a65b1f2 100644
--- a/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
+++ b/docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl
@@ -25,6 +25,16 @@ services:
POSTGRES_PASSWORD: 123456
ports:
- ${DOCKER_PG_14_EXTERNAL_PORT}:5432
+ entrypoint:
+ - bash
+ - -c
+ - |
+ chmod 600 /var/lib/postgresql/certs/server.key
+ chmod 644 /var/lib/postgresql/certs/server.crt
+ chmod 644 /var/lib/postgresql/certs/root.crt
+ chown postgres:postgres /var/lib/postgresql/certs/*
+ exec docker-entrypoint.sh "$@"
+ - --
command:
- "postgres"
- "-c"
@@ -33,6 +43,14 @@ services:
- "max_wal_senders=30"
- "-c"
- "max_replication_slots=30"
+ - "-c"
+ - "ssl=on"
+ - "-c"
+ - "ssl_cert_file=/var/lib/postgresql/certs/server.crt"
+ - "-c"
+ - "ssl_key_file=/var/lib/postgresql/certs/server.key"
+ - "-c"
+ - "ssl_ca_file=/var/lib/postgresql/certs/root.crt"
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres && psql -U postgres -c
'SELECT 1 FROM doris_test.deadline;'" ]
interval: 5s
@@ -41,6 +59,7 @@ services:
volumes:
- ./data/data/:/var/lib/postgresql/data/
- ./init:/docker-entrypoint-initdb.d
+ - ./certs:/var/lib/postgresql/certs
networks:
- doris--postgres
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 17c5d7d575e..77ce8b2bf51 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,6 +35,8 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+ public static final String SSL_MODE = "ssl_mode";
+ public static final String SSL_ROOTCERT = "ssl_rootcert";
// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
index 8449afd9628..1a57cbdefe3 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
@@ -35,9 +35,10 @@ public class CompareOffsetRequest extends JobBaseConfig {
public CompareOffsetRequest(Long jobId,
String sourceType,
Map<String, String> sourceProperties,
+ String frontendAddress,
Map<String, String> offsetFirst,
Map<String, String> offsetSecond) {
- super(jobId, sourceType, sourceProperties);
+ super(jobId, sourceType, sourceProperties, frontendAddress);
this.offsetFirst = offsetFirst;
this.offsetSecond = offsetSecond;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
index f855e373958..5c3cf62ab4c 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
@@ -34,8 +34,8 @@ public class FetchTableSplitsRequest extends JobBaseConfig {
private String snapshotTable;
public FetchTableSplitsRequest(Long jobId, String name,
- Map<String, String> sourceProperties, String snapshotTable) {
- super(jobId, name, sourceProperties);
+ Map<String, String> sourceProperties, String frontendAddress,
String snapshotTable) {
+ super(jobId, name, sourceProperties, frontendAddress);
this.snapshotTable = snapshotTable;
}
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
index bfdbf6a3455..c7b60026d88 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
@@ -30,4 +30,5 @@ public class JobBaseConfig {
private Long jobId;
private String dataSource;
private Map<String, String> config;
+ private String frontendAddress;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index 195125b1c66..fd56518643d 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -28,7 +28,6 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
private long maxInterval;
private String targetDb;
private String token;
- private String frontendAddress;
private String taskId;
private Map<String, String> streamLoadProps;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index cb7ec530fcf..f3d9e016950 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -38,7 +38,9 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.INCLUDE_TABLES,
DataSourceConfigKeys.EXCLUDE_TABLES,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
- DataSourceConfigKeys.SNAPSHOT_PARALLELISM
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SSL_MODE,
+ DataSourceConfigKeys.SSL_ROOTCERT
);
public static void validateSource(Map<String, String> input) throws
IllegalArgumentException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c820b8d532a..189656fa48d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -102,6 +102,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@Log4j2
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> implements
TxnStateChangeCallback, GsonPostProcessable {
+ public static final String JOB_FILE_CATALOG = "streaming_job";
private long dbId;
// Streaming job statistics, all persisted in txn attachment
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@@ -224,7 +225,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
String includeTables = String.join(",", createTbls);
sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES,
includeTables);
}
- this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType, sourceProperties);
+ this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType,
+ StreamingJobUtils.convertCertFile(getDbId(),
sourceProperties));
JdbcSourceOffsetProvider rdsOffsetProvider =
(JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
} catch (Exception ex) {
@@ -459,7 +461,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return newTasks;
}
- protected AbstractStreamingTask createStreamingTask() {
+ protected AbstractStreamingTask createStreamingTask() throws JobException {
if (tvfType != null) {
this.runningStreamTask = createStreamingInsertTask();
} else {
@@ -477,9 +479,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
* for From MySQL TO Database
* @return
*/
- private AbstractStreamingTask createStreamingMultiTblTask() {
+ private AbstractStreamingTask createStreamingMultiTblTask() throws
JobException {
+ Map<String, String> convertSourceProps =
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
return new StreamingMultiTblTask(getJobId(),
Env.getCurrentEnv().getNextId(), dataSourceType,
- offsetProvider, sourceProperties, targetDb, targetProperties,
jobProperties, getCreateUser());
+ offsetProvider, convertSourceProps, targetDb,
targetProperties, jobProperties, getCreateUser());
}
protected AbstractStreamingTask createStreamingInsertTask() {
@@ -599,7 +602,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
updateJobStatus(JobStatus.PAUSED);
}
- public void onStreamTaskSuccess(AbstractStreamingTask task) {
+ public void onStreamTaskSuccess(AbstractStreamingTask task) throws
JobException {
try {
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index b6a4e8c939b..cf9a9b905be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -61,6 +61,10 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+/**
+ * In PostgreSQL/MySQL, multi-table writes are performed by tasks that only
make calls.
+ * The write logic resides in the `cdc_client` and is implemented via
`stream_load`.
+ */
@Log4j2
@Getter
public class StreamingMultiTblTask extends AbstractStreamingTask {
@@ -81,7 +85,6 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
public StreamingMultiTblTask(Long jobId,
long taskId,
DataSourceType dataSourceType,
-
SourceOffsetProvider offsetProvider,
Map<String, String> sourceProperties,
String targetDb,
@@ -180,8 +183,8 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
WriteRecordRequest request = new WriteRecordRequest();
request.setJobId(getJobId());
request.setConfig(sourceProperties);
- request.setDataSource(dataSourceType.name());
+ request.setDataSource(dataSourceType.name());
request.setTaskId(getTaskId() + "");
request.setToken(getToken());
request.setTargetDb(targetDb);
@@ -228,7 +231,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
/**
* Callback function for offset commit success.
*/
- public void successCallback(CommitOffsetRequest offsetRequest) {
+ public void successCallback(CommitOffsetRequest offsetRequest) throws
JobException {
if (getIsCanceled().get()) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index b2f0d02e538..d04245317b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -17,6 +17,7 @@
package org.apache.doris.job.offset.jdbc;
+import org.apache.doris.catalog.Env;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
@@ -192,7 +193,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
Backend backend = StreamingJobUtils.selectBackend();
- JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
+ JobBaseConfig requestParams =
+ new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
.setParams(new Gson().toJson(requestParams)).build();
@@ -273,7 +275,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
CompareOffsetRequest requestParams =
- new CompareOffsetRequest(getJobId(), sourceType.name(),
sourceProperties, offsetFirst, offsetSecond);
+ new CompareOffsetRequest(getJobId(), sourceType.name(),
sourceProperties,
+ getFrontendAddress(), offsetFirst, offsetSecond);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/compareOffset")
.setParams(new Gson().toJson(requestParams)).build();
@@ -486,7 +489,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
private List<SnapshotSplit> requestTableSplits(String table) throws
JobException {
Backend backend = StreamingJobUtils.selectBackend();
FetchTableSplitsRequest requestParams =
- new FetchTableSplitsRequest(getJobId(), sourceType.name(),
sourceProperties, table);
+ new FetchTableSplitsRequest(getJobId(), sourceType.name(),
+ sourceProperties, getFrontendAddress(), table);
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchSplits")
.setParams(new Gson().toJson(requestParams)).build();
@@ -537,7 +541,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
*/
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
- JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
+ JobBaseConfig requestParams =
+ new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/initReader")
.setParams(new Gson().toJson(requestParams)).build();
@@ -584,7 +589,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend();
- JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
+ JobBaseConfig requestParams =
+ new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
.setParams(new Gson().toJson(requestParams)).build();
@@ -602,4 +608,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
log.warn("Close job error: ", ex);
}
}
+
+ private String getFrontendAddress() {
+ return Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 1a1fb68fe82..7299f0a9b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -25,7 +25,10 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.SmallFileMgr;
+import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@@ -33,6 +36,7 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
@@ -245,6 +249,32 @@ public class StreamingJobUtils {
return index;
}
+ /**
+ * When enabling SSL, you need to convert FILE:ca.pem to FILE:ca.pem:md5.
+ */
+ public static Map<String, String> convertCertFile(long dbId, Map<String,
String> sourceProperties)
+ throws JobException {
+ SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+ Map<String, String> newProps = new HashMap<>(sourceProperties);
+ if (sourceProperties.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+ String certFile =
sourceProperties.get(DataSourceConfigKeys.SSL_ROOTCERT);
+ if (certFile.startsWith("FILE:")) {
+ String file = certFile.substring(certFile.indexOf(":") + 1);
+ try {
+ SmallFile smallFile =
+ smallFileMgr.getSmallFile(dbId,
StreamingInsertJob.JOB_FILE_CATALOG, file, true);
+ newProps.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:" +
smallFile.id + ":" + smallFile.md5);
+ } catch (DdlException ex) {
+ throw new JobException("ssl root cert file not found: " +
certFile, ex);
+ }
+ } else {
+ throw new JobException("ssl root cert is not in expected
format, "
+ + "should start with FILE:, got " + certFile);
+ }
+ }
+ return newProps;
+ }
+
public static List<CreateTableCommand> generateCreateTableCmds(String
targetDb, DataSourceType sourceType,
Map<String, String> properties, Map<String, String>
targetProperties)
throws JobException {
diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index cac45bdc7a1..afa186844b7 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -178,6 +178,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.10.2</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index ff4056a8b5b..332a4766002 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
+import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ public class Env {
private final Map<Long, JobContext> jobContexts;
private final Map<Long, Lock> jobLocks;
@Setter private int backendHttpPort;
+ @Setter @Getter private String clusterToken;
+ @Setter @Getter private volatile String feMasterAddress;
private Env() {
this.jobContexts = new ConcurrentHashMap<>();
@@ -62,6 +65,9 @@ public class Env {
}
public SourceReader getReader(JobBaseConfig jobConfig) {
+ if (jobConfig.getFrontendAddress() != null &&
!jobConfig.getFrontendAddress().isEmpty()) {
+ this.feMasterAddress = jobConfig.getFrontendAddress();
+ }
DataSource ds = resolveDataSource(jobConfig.getDataSource());
Env manager = Env.getCurrentEnv();
return manager.getOrCreateReader(jobConfig.getJobId(), ds,
jobConfig.getConfig());
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
similarity index 88%
rename from
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
rename to
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
index c1977134530..11102b1db08 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/BackendPortHolder.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/SystemEnvInitializer.java
@@ -24,13 +24,17 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
-public class BackendPortHolder {
+public class SystemEnvInitializer {
@Value("${backend.http.port}")
private int port;
+ @Value("${cluster.token}")
+ private String clusterToken;
+
@PostConstruct
public void init() {
Env.getCurrentEnv().setBackendHttpPort(port);
+ Env.getCurrentEnv().setClusterToken(clusterToken);
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 5a9fa095941..a0bb57ad120 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -22,6 +22,7 @@ import
org.apache.doris.cdcclient.exception.CdcClientException;
import org.apache.doris.cdcclient.source.factory.DataSource;
import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.cdcclient.utils.SmallFileMgr;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
public class PostgresSourceReader extends JdbcIncrementalSourceReader {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresSourceReader.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final Object SLOT_CREATION_LOCK = new Object();
public PostgresSourceReader() {
super();
@@ -88,8 +90,10 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
PostgresDialect dialect = new PostgresDialect(sourceConfig);
- LOG.info("Creating slot for job {}, user {}", jobId,
sourceConfig.getUsername());
- createSlotForGlobalStreamSplit(dialect);
+ synchronized (SLOT_CREATION_LOCK) {
+ LOG.info("Creating slot for job {}, user {}", jobId,
sourceConfig.getUsername());
+ createSlotForGlobalStreamSplit(dialect);
+ }
super.initialize(jobId, dataSource, config);
}
@@ -215,6 +219,18 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
dbzProps.put("interval.handling.mode", "string");
configFactory.debeziumProperties(dbzProps);
+ // setting ssl
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) {
+ dbzProps.put("database.sslmode",
cdcConfig.get(DataSourceConfigKeys.SSL_MODE));
+ }
+
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+ String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT);
+ String filePath = SmallFileMgr.getFilePath(fileName);
+ LOG.info("Using SSL root cert file path: {}", filePath);
+ dbzProps.put("database.sslrootcert", filePath);
+ }
+
configFactory.serverTimeZone(
ConfigUtil.getPostgresServerTimeZoneFromProps(props).toString());
configFactory.slotName(getSlotName(jobId));
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
new file mode 100644
index 00000000000..1e854097883
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+ private static final Logger LOG =
LoggerFactory.getLogger(SmallFileMgr.class);
+
+ private static final String FILE_PREFIX = "FILE:";
+
+ /** In-memory cache: "file_id:md5" -> absolute local file path */
+ private static final Map<String, String> MEM_CACHE = new
ConcurrentHashMap<>();
+
+ /**
+ * Per-key locks to serialize concurrent downloads of the same file,
preventing tmp file
+ * corruption when multiple threads race on the same file_id:md5 key.
+ */
+ private static final Map<String, Object> DOWNLOAD_LOCKS = new
ConcurrentHashMap<>();
+
+ private SmallFileMgr() {}
+
+ /**
+ * Resolve a FILE: reference to an absolute local file path, downloading
from FE if needed. FE
+ * address and cluster token are read from {@link Env}.
+ *
+ * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+ * @return absolute local file path
+ */
+ public static String getFilePath(String filePath) {
+ return getFilePath(
+ Env.getCurrentEnv().getFeMasterAddress(),
+ filePath,
+ Env.getCurrentEnv().getClusterToken(),
+ getLocalDir());
+ }
+
+ /**
+ * Get the directory of the currently running JAR file
+ *
+ * @return
+ */
+ static String getLocalDir() {
+ try {
+ URL url =
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+ LOG.info("Get code source URL: {}", url);
+ // Spring Boot fat jar:
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+ if ("jar".equals(url.getProtocol())) {
+ String path = url.getPath(); //
file:/path/to/app.jar!/BOOT-INF/classes!/
+ int separator = path.indexOf("!");
+ if (separator > 0) {
+ path = path.substring(0, separator); //
file:/path/to/app.jar
+ }
+ url = new URL(path);
+ }
+ File file = new File(url.toURI());
+ // When running a JAR file, `file` refers to the JAR file itself,
taking its parent
+ // directory.
+ // When running an IDE file, `file` refers to the classes
directory, returning directly.
+ return file.isFile() ? file.getParentFile().getAbsolutePath() :
file.getAbsolutePath();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /** Package-private overload that accepts a custom local directory, used
for testing. */
+ static String getFilePath(
+ String feMasterAddress, String filePath, String clusterToken,
String localDir) {
+ if (!filePath.startsWith(FILE_PREFIX)) {
+ throw new IllegalArgumentException("filePath must start with
FILE:, got: " + filePath);
+ }
+ if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+ throw new IllegalArgumentException(
+ "feMasterAddress is required when filePath is a FILE:
reference");
+ }
+ String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid filePath format, expected FILE:file_id:md5, got:
" + filePath);
+ }
+ String fileId = parts[0];
+ String md5 = parts[1];
+ String cacheKey = fileId + ":" + md5;
+
+ // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+ String memCached = MEM_CACHE.get(cacheKey);
+ if (memCached != null) {
+ LOG.debug("SmallFile memory cache hit: {}", memCached);
+ return memCached;
+ }
+
+ // 2. Serialize concurrent downloads of the same file to prevent tmp
file corruption
+ Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new
Object());
+ synchronized (lock) {
+ // Double-check memory cache inside the lock
+ String doubleChecked = MEM_CACHE.get(cacheKey);
+ if (doubleChecked != null) {
+ LOG.debug("SmallFile memory cache hit (after lock): {}",
doubleChecked);
+ return doubleChecked;
+ }
+
+ String finalFilePath = localDir + File.separator + fileId + "." +
md5;
+ File finalFile = new File(finalFilePath);
+
+ // 3. Disk cache hit — avoid downloading again after process
restart
+ if (finalFile.exists()) {
+ try (FileInputStream fis = new FileInputStream(finalFile)) {
+ String diskMd5 = DigestUtils.md5Hex(fis);
+ if (diskMd5.equalsIgnoreCase(md5)) {
+ LOG.info("SmallFile disk cache hit: {}",
finalFilePath);
+ MEM_CACHE.put(cacheKey, finalFilePath);
+ return finalFilePath;
+ }
+ LOG.warn(
+ "SmallFile disk cache MD5 mismatch,
re-downloading: {}", finalFilePath);
+ } catch (IOException e) {
+ LOG.warn(
+ "Failed to read disk cached file, re-downloading:
{}",
+ finalFilePath,
+ e);
+ }
+ finalFile.delete();
+ }
+
+ // 4. Download from FE: GET
/api/get_small_file?file_id=xxx&token=yyy
+ String url =
+ "http://"
+ + feMasterAddress
+ + "/api/get_small_file?file_id="
+ + fileId
+ + "&token="
+ + clusterToken;
+ LOG.info("Downloading small file from FE: {}", url);
+
+ File tmpFile = new File(localDir + File.separator + fileId +
".tmp");
+ try (CloseableHttpClient client = HttpUtil.getHttpClient();
+ CloseableHttpResponse response = client.execute(new
HttpGet(url))) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ throw new RuntimeException(
+ "Failed to download small file, status=" +
statusCode + ", url=" + url);
+ }
+ try (InputStream in = response.getEntity().getContent();
+ FileOutputStream fos = new FileOutputStream(tmpFile)) {
+ byte[] buf = new byte[8192];
+ int n;
+ while ((n = in.read(buf)) != -1) {
+ fos.write(buf, 0, n);
+ }
+ }
+ } catch (IOException e) {
+ tmpFile.delete();
+ throw new RuntimeException("Failed to download small file from
FE: " + url, e);
+ }
+
+ // 5. Verify MD5 of downloaded content
+ try (FileInputStream fis = new FileInputStream(tmpFile)) {
+ String downloadedMd5 = DigestUtils.md5Hex(fis);
+ if (!downloadedMd5.equalsIgnoreCase(md5)) {
+ tmpFile.delete();
+ throw new RuntimeException(
+ "Small file MD5 mismatch, expected=" + md5 + ",
got=" + downloadedMd5);
+ }
+ } catch (IOException e) {
+ tmpFile.delete();
+ throw new RuntimeException("Failed to verify downloaded file
MD5", e);
+ }
+
+ // 6. Atomically promote tmp file to final path
+ if (!tmpFile.renameTo(finalFile)) {
+ tmpFile.delete();
+ throw new RuntimeException("Failed to rename tmp file to: " +
finalFilePath);
+ }
+
+ LOG.info("Small file ready at: {}", finalFilePath);
+ MEM_CACHE.put(cacheKey, finalFilePath);
+ return finalFilePath;
+ }
+ }
+
+ /** Clears the in-memory cache. Exposed for testing. */
+ static void clearCache() {
+ MEM_CACHE.clear();
+ DOWNLOAD_LOCKS.clear();
+ }
+}
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties
b/fs_brokers/cdc_client/src/main/resources/application.properties
index 9a7ee6c3f76..a7ea3996a6f 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -17,4 +17,6 @@
################################################################################
spring.web.resources.add-mappings=false
server.port=9096
-backend.http.port=8040
\ No newline at end of file
+backend.http.port=8040
+# see doris-meta/image/VERSION
+cluster.token=cluster-token
\ No newline at end of file
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
new file mode 100644
index 00000000000..ae99d4db986
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SmallFileMgrTest {
+
+ @TempDir
+ Path tempDir;
+
+ @BeforeEach
+ void setUp() {
+ SmallFileMgr.clearCache();
+ }
+
+ @AfterEach
+ void tearDown() {
+ SmallFileMgr.clearCache();
+ }
+
+ //
-------------------------------------------------------------------------
+ // Input validation
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testInvalidPrefix() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SmallFileMgr.getFilePath(
+ "host:8030", "NOFILE:123:abc", "token",
tempDir.toString()));
+ }
+
+ @Test
+ void testInvalidFormatMissingMd5() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SmallFileMgr.getFilePath(
+ "host:8030", "FILE:123", "token", tempDir.toString()));
+ }
+
+ @Test
+ void testInvalidFormatTooManyParts() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SmallFileMgr.getFilePath(
+ "host:8030", "FILE:123:abc:extra", "token",
tempDir.toString()));
+ }
+
+ //
-------------------------------------------------------------------------
+ // Disk cache hit
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testDiskCacheHit() throws Exception {
+ byte[] content = "-----BEGIN CERTIFICATE-----\ntest\n-----END
CERTIFICATE-----".getBytes();
+ String md5 = DigestUtils.md5Hex(content);
+ String fileId = "10001";
+
+ // Pre-populate disk cache
+ File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+ Files.write(cachedFile.toPath(), content);
+
+ String result = SmallFileMgr.getFilePath(
+ "host:8030", "FILE:" + fileId + ":" + md5, "token",
tempDir.toString());
+
+ assertEquals(cachedFile.getAbsolutePath(), result);
+ }
+
+ @Test
+ void testDiskCacheMd5MismatchTriggersRedownload() throws Exception {
+ byte[] correctContent = "correct cert content".getBytes();
+ String correctMd5 = DigestUtils.md5Hex(correctContent);
+ String fileId = "10002";
+
+ // Write a file with the correct name but corrupted content
+ File corruptFile = tempDir.resolve(fileId + "." + correctMd5).toFile();
+ Files.write(corruptFile.toPath(), "corrupted content".getBytes());
+
+ // Start a mock FE HTTP server that serves the correct content
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ exchange.sendResponseHeaders(200, correctContent.length);
+ exchange.getResponseBody().write(correctContent);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ try {
+ String result = SmallFileMgr.getFilePath(
+ "127.0.0.1:" + port,
+ "FILE:" + fileId + ":" + correctMd5,
+ "test-token",
+ tempDir.toString());
+
+ assertEquals(corruptFile.getAbsolutePath(), result);
+ // Verify the file now contains the correct content
+ byte[] written = Files.readAllBytes(corruptFile.toPath());
+ assertEquals(correctMd5, DigestUtils.md5Hex(written));
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // In-memory cache hit
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testMemoryCacheHitSkipsDiskIo() throws Exception {
+ byte[] content = "ssl cert data".getBytes();
+ String md5 = DigestUtils.md5Hex(content);
+ String fileId = "10003";
+
+ // Populate memory cache via a disk-cache hit on the first call
+ File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+ Files.write(cachedFile.toPath(), content);
+ SmallFileMgr.getFilePath(
+ "host:8030", "FILE:" + fileId + ":" + md5, "token",
tempDir.toString());
+
+ // Remove disk file so any disk/HTTP access would fail
+ assertTrue(cachedFile.delete());
+
+ // Second call must still succeed via memory cache
+ String result = SmallFileMgr.getFilePath(
+ "host:8030", "FILE:" + fileId + ":" + md5, "token",
tempDir.toString());
+
+ assertEquals(cachedFile.getAbsolutePath(), result);
+ }
+
+ //
-------------------------------------------------------------------------
+ // HTTP download from FE
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testDownloadSuccess() throws Exception {
+ byte[] content = "-----BEGIN CERTIFICATE-----\ndata\n-----END
CERTIFICATE-----".getBytes();
+ String md5 = DigestUtils.md5Hex(content);
+ String fileId = "20001";
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ // Verify query params are forwarded correctly
+ String query = exchange.getRequestURI().getQuery();
+ assertTrue(query.contains("file_id=" + fileId));
+ assertTrue(query.contains("token=test-token"));
+
+ exchange.sendResponseHeaders(200, content.length);
+ exchange.getResponseBody().write(content);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ try {
+ String result = SmallFileMgr.getFilePath(
+ "127.0.0.1:" + port,
+ "FILE:" + fileId + ":" + md5,
+ "test-token",
+ tempDir.toString());
+
+ File expectedFile = tempDir.resolve(fileId + "." + md5).toFile();
+ assertEquals(expectedFile.getAbsolutePath(), result);
+ assertTrue(expectedFile.exists());
+
+ // Verify file content
+ byte[] written = Files.readAllBytes(expectedFile.toPath());
+ assertEquals(md5, DigestUtils.md5Hex(written));
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ @Test
+ void testDownloadSecondCallUsesMemoryCache() throws Exception {
+ byte[] content = "cert content".getBytes();
+ String md5 = DigestUtils.md5Hex(content);
+ String fileId = "20002";
+
+ int[] callCount = {0};
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ callCount[0]++;
+ exchange.sendResponseHeaders(200, content.length);
+ exchange.getResponseBody().write(content);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ try {
+ String address = "127.0.0.1:" + port;
+ String filePath = "FILE:" + fileId + ":" + md5;
+
+ SmallFileMgr.getFilePath(address, filePath, "token",
tempDir.toString());
+ SmallFileMgr.getFilePath(address, filePath, "token",
tempDir.toString());
+
+ assertEquals(1, callCount[0], "FE should only be contacted once");
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ @Test
+ void testDownloadMd5Mismatch() throws Exception {
+ byte[] expectedContent = "expected content".getBytes();
+ String expectedMd5 = DigestUtils.md5Hex(expectedContent);
+ String fileId = "20003";
+
+ byte[] wrongContent = "totally different content".getBytes();
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ exchange.sendResponseHeaders(200, wrongContent.length);
+ exchange.getResponseBody().write(wrongContent);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ try {
+ RuntimeException ex = assertThrows(
+ RuntimeException.class,
+ () -> SmallFileMgr.getFilePath(
+ "127.0.0.1:" + port,
+ "FILE:" + fileId + ":" + expectedMd5,
+ "test-token",
+ tempDir.toString()));
+ assertTrue(ex.getMessage().contains("MD5 mismatch"));
+
+ // Tmp file must be cleaned up after failure
+ File tmpFile = tempDir.resolve(fileId + ".tmp").toFile();
+ assertTrue(!tmpFile.exists(), "tmp file should be deleted after
MD5 mismatch");
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ @Test
+ void testDownloadHttpError() throws Exception {
+ String fileId = "20004";
+ String md5 = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4";
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ exchange.sendResponseHeaders(500, -1);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ try {
+ RuntimeException ex = assertThrows(
+ RuntimeException.class,
+ () -> SmallFileMgr.getFilePath(
+ "127.0.0.1:" + port,
+ "FILE:" + fileId + ":" + md5,
+ "test-token",
+ tempDir.toString()));
+ assertTrue(ex.getMessage().contains("status=500"));
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Concurrency
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Verifies that N threads concurrently requesting the same file result in
exactly one FE
+ * download and all threads receive the correct file path.
+ */
+ @Test
+ void testConcurrentDownloadSameFileSingleFetch() throws Exception {
+ byte[] content = "concurrent ssl cert content".getBytes();
+ String md5 = DigestUtils.md5Hex(content);
+ String fileId = "30001";
+ int threadCount = 8;
+
+ AtomicInteger callCount = new AtomicInteger(0);
+ // Simulate a slow FE download so threads have a chance to race
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+ int port = server.getAddress().getPort();
+ server.createContext(
+ "/api/get_small_file",
+ exchange -> {
+ callCount.incrementAndGet();
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException ignored) {
+ }
+ exchange.sendResponseHeaders(200, content.length);
+ exchange.getResponseBody().write(content);
+ exchange.getResponseBody().close();
+ });
+ server.start();
+
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startGate = new CountDownLatch(1);
+ String address = "127.0.0.1:" + port;
+ String filePath = "FILE:" + fileId + ":" + md5;
+ String expectedPath = tempDir.resolve(fileId + "." +
md5).toFile().getAbsolutePath();
+
+ try {
+ List<Future<String>> futures = new ArrayList<>();
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(pool.submit(() -> {
+ startGate.await();
+ return SmallFileMgr.getFilePath(
+ address, filePath, "token", tempDir.toString());
+ }));
+ }
+
+ startGate.countDown(); // release all threads simultaneously
+
+ for (Future<String> future : futures) {
+ assertEquals(expectedPath, future.get());
+ }
+
+ assertEquals(1, callCount.get(), "FE should be contacted exactly
once");
+
+ // Verify the final file is intact
+ byte[] written = Files.readAllBytes(tempDir.resolve(fileId + "." +
md5));
+ assertEquals(md5, DigestUtils.md5Hex(written));
+ } finally {
+ pool.shutdown();
+ server.stop(0);
+ }
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
new file mode 100644
index 00000000000..93141f92358
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_table1 --
+A1 1
+B1 2
+
+-- !select_binlog_table1 --
+B1 10
+Doris 18
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index d16bc57e73e..4909e3d0469 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -180,7 +180,7 @@ suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,extern
def jobStatus = sql """ select status, ErrorMsg from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status
- jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch
meta")
+ jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0)
}
)
} catch (Exception ex){
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
new file mode 100644
index 00000000000..20a854cae64
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_ssl",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_name_ssl"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_normal1_ssl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // create test
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+ def sslInfo = sql """SHOW ssl"""
+ log.info("sslInfo: " + sslInfo)
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('A1', 1);"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('B1', 2);"""
+ }
+
+ try {
+ sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog"
= "streaming_job")"""
+ } catch (Exception ignored) {
+ // ignore
+ }
+
+ sql """CREATE FILE "ca.pem"
+ IN ${currentDb}
+ PROPERTIES
+ (
+ "url" =
"https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt",
+ "catalog" = "streaming_job"
+ )
+ """
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "ssl_mode" = "verify-ca",
+ "ssl_rootcert" = "FILE:ca.pem"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ def showAllTables = sql """ show tables from ${currentDb}"""
+ log.info("showAllTables: " + showAllTables)
+ // check table created
+ def showTables = sql """ show tables from ${currentDb} like
'${table1}'; """
+ assert showTables.size() == 1
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than
2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // check snapshot data
+ qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
+
+ // mock incremental into
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES
('Doris',18);"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE name = 'Doris'; """
+ log.info("xminResult: " + xminResult)
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE
name = 'B1';"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name =
'A1';"""
+ }
+
+ sleep(60000); // wait for cdc incremental data
+
+ // check incremental data
+ qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc
"""
+
+ def jobInfo = sql """
+ select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 5
+ assert loadStat.loadBytes == 246
+ assert jobInfo.get(0).get(1) == "RUNNING"
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ try {
+ sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog"
= "streaming_job")"""
+ } catch (Exception ignored) {
+ // ignore
+ }
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]