This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd02a57e68 [Improve](streaming job) support custom table name mapping
for CDC streaming job (#61317)
0cd02a57e68 is described below
commit 0cd02a57e68a3a5965acab400e8edbe18a5c6aed
Author: wudi <[email protected]>
AuthorDate: Mon Mar 23 10:28:57 2026 +0800
[Improve](streaming job) support custom table name mapping for CDC
streaming job (#61317)
### What problem does this PR solve?
#### Summary
Add support for mapping upstream (PostgreSQL) table names to custom
downstream (Doris) table names
in CDC streaming jobs. Without this feature, the Doris target table must
have the same name as the
upstream source table.
#### New configuration
Key format: `"table.<srcTable>.target_table" = "<dstTable>"` in the
`FROM` clause properties.
```sql
CREATE JOB my_job
ON STREAMING
FROM POSTGRES (
...
"include_tables" = "pg_orders",
"table.pg_orders.target_table" = "doris_orders"
)
TO DATABASE mydb (...)
```
When not configured, behavior is unchanged (target table name = source
table name).
#### Key design decisions
- generateCreateTableCmds returns LinkedHashMap<srcName,
CreateTableCommand> so callers can
distinguish source names (for CDC monitoring) from target names (for DDL)
— this fixes a bug
where the CDC split assigner would look up the Doris target table name in
PostgreSQL
- Multi-table merge is supported: two source tables can map to the same
Doris table
#### Test plan
- test_streaming_postgres_job_table_mapping: basic mapping
(INSERT/UPDATE/DELETE land in mapped table; Doris table
created with target name, not source name)
- test_streaming_postgres_job_table_mapping: multi-table merge (two PG
tables → one Doris table, snapshot +
incremental)
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 1 +
.../streaming/DataSourceConfigValidator.java | 5 +
.../insert/streaming/StreamingInsertJob.java | 15 +-
.../apache/doris/job/util/StreamingJobUtils.java | 25 ++-
.../cdcclient/service/PipelineCoordinator.java | 8 +-
.../deserialize/DebeziumJsonDeserializer.java | 11 ++
.../PostgresDebeziumJsonDeserializer.java | 6 +-
.../apache/doris/cdcclient/utils/ConfigUtil.java | 25 +++
.../test_streaming_postgres_job_table_mapping.out | 19 ++
...est_streaming_postgres_job_table_mapping.groovy | 194 +++++++++++++++++++++
10 files changed, 298 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 3858d9ebaff..b2bda583beb 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -41,6 +41,7 @@ public class DataSourceConfigKeys {
// per-table config: key format is "table.<tableName>.<suffix>"
public static final String TABLE = "table";
public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX =
"exclude_columns";
+ public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table";
// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 3bea2c21242..63efaf296cb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -45,6 +45,7 @@ public class DataSourceConfigValidator {
// Known suffixes for per-table config keys (format:
"table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES =
Sets.newHashSet(
+ DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
);
@@ -67,6 +68,10 @@ public class DataSourceConfigValidator {
if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
throw new IllegalArgumentException("Unknown per-table
config key: '" + key + "'");
}
+ if (value == null || value.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Value for per-table config key '" + key + "' must
not be empty");
+ }
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index ea5298c026a..be5c70d864a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -92,6 +92,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -255,15 +256,21 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private List<String> createTableIfNotExists() throws Exception {
List<String> syncTbls = new ArrayList<>();
- List<CreateTableCommand> createTblCmds =
StreamingJobUtils.generateCreateTableCmds(targetDb,
- dataSourceType, sourceProperties, targetProperties);
+ // Key: source table name (PG/MySQL); Value: CreateTableCommand for
the Doris target table.
+ // The two names differ when "table.<src>.target_table" is configured.
+ LinkedHashMap<String, CreateTableCommand> createTblCmds =
+ StreamingJobUtils.generateCreateTableCmds(targetDb,
+ dataSourceType, sourceProperties, targetProperties);
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbNullable(targetDb);
Preconditions.checkNotNull(db, "target database %s does not exist",
targetDb);
- for (CreateTableCommand createTblCmd : createTblCmds) {
+ for (Map.Entry<String, CreateTableCommand> entry :
createTblCmds.entrySet()) {
+ String srcTable = entry.getKey();
+ CreateTableCommand createTblCmd = entry.getValue();
if
(!db.isTableExist(createTblCmd.getCreateTableInfo().getTableName())) {
createTblCmd.run(ConnectContext.get(), null);
}
- syncTbls.add(createTblCmd.getCreateTableInfo().getTableName());
+ // Use the source (upstream) table name so CDC monitors the
correct PG/MySQL table
+ syncTbls.add(srcTable);
}
return syncTbls;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index d2222dad383..9eec0061219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -277,10 +277,20 @@ public class StreamingJobUtils {
return newProps;
}
- public static List<CreateTableCommand> generateCreateTableCmds(String
targetDb, DataSourceType sourceType,
+ /**
+ * Generate CREATE TABLE commands for the Doris target tables.
+ *
+ * <p>Returns a {@link LinkedHashMap} whose key is the <b>source</b>
(upstream) table name and
+ * whose value is the corresponding {@link CreateTableCommand} that
creates the Doris target
+ * table (which may have a different name when {@code
table.<src>.target_table} is configured).
+ * Callers must use the map key as the PG/MySQL source table identifier
for CDC monitoring and
+ * the {@link CreateTableCommand} value for the actual DDL execution.
+ */
+ public static LinkedHashMap<String, CreateTableCommand>
generateCreateTableCmds(String targetDb,
+ DataSourceType sourceType,
Map<String, String> properties, Map<String, String>
targetProperties)
throws JobException {
- List<CreateTableCommand> createtblCmds = new ArrayList<>();
+ LinkedHashMap<String, CreateTableCommand> createtblCmds = new
LinkedHashMap<>();
String includeTables =
properties.get(DataSourceConfigKeys.INCLUDE_TABLES);
String excludeTables =
properties.get(DataSourceConfigKeys.EXCLUDE_TABLES);
List<String> includeTablesList = new ArrayList<>();
@@ -322,6 +332,12 @@ public class StreamingJobUtils {
noPrimaryKeyTables.add(table);
}
+ // Resolve target (Doris) table name; defaults to source table
name if not configured
+ String targetTableName = properties.getOrDefault(
+ DataSourceConfigKeys.TABLE + "." + table + "."
+ + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
+ table).trim();
+
// Validate and apply exclude_columns for this table
Set<String> excludeColumns = parseExcludeColumns(properties,
table);
if (!excludeColumns.isEmpty()) {
@@ -352,7 +368,7 @@ public class StreamingJobUtils {
false, // isTemp
InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
targetDb, // dbName
- table, // tableName
+ targetTableName, // tableName
columnDefinitions, // columns
ImmutableList.of(), // indexes
"olap", // engineName
@@ -367,7 +383,8 @@ public class StreamingJobUtils {
ImmutableList.of() // clusterKeyColumnNames
);
CreateTableCommand createtblCmd = new
CreateTableCommand(Optional.empty(), createtblInfo);
- createtblCmds.add(createtblCmd);
+ // Key: source (PG/MySQL) table name; Value: command that creates
the Doris target table
+ createtblCmds.put(table, createtblCmd);
}
if (createtblCmds.isEmpty()) {
throw new JobException("Can not found match table in database " +
database);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 97aa4b7f5f2..414a1d23797 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -249,6 +250,10 @@ public class PipelineCoordinator {
Map<String, String> deserializeContext = new
HashMap<>(writeRecordRequest.getConfig());
deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
+ // Pre-parse source->target table name mappings once for this request
+ Map<String, String> targetTableMappings =
+
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
+
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
@@ -338,9 +343,10 @@ public class PipelineCoordinator {
}
if (!CollectionUtils.isEmpty(result.getRecords())) {
String table = extractTable(element);
+ String dorisTable =
targetTableMappings.getOrDefault(table, table);
for (String record : result.getRecords()) {
scannedRows++;
- batchStreamLoad.writeRecord(targetDb, table,
record.getBytes());
+ batchStreamLoad.writeRecord(targetDb, dorisTable,
record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 25b2b544893..7876597660e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -82,6 +82,8 @@ public class DebeziumJsonDeserializer
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange>
tableSchemas;
+ // Parsed source->target table name mappings, populated once in init()
from config
+ protected Map<String, String> targetTableMappingsCache = new HashMap<>();
// Parsed exclude-column sets per table, populated once in init() from
config
protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
@@ -91,9 +93,18 @@ public class DebeziumJsonDeserializer
public void init(Map<String, String> props) {
this.serverTimeZone =
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+ targetTableMappingsCache =
ConfigUtil.parseAllTargetTableMappings(props);
excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
}
+ /**
+ * Resolve the Doris target table name for a given upstream (PG) source
table name. Returns the
+ * mapped name if configured, otherwise returns the source name unchanged.
+ */
+ protected String resolveTargetTable(String srcTable) {
+ return targetTableMappingsCache.getOrDefault(srcTable, srcTable);
+ }
+
@Override
public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
throws IOException {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index aa1a6e9c7bd..85fdb1ddea7 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -218,7 +218,9 @@ public class PostgresDebeziumJsonDeserializer extends
DebeziumJsonDeserializer {
colName);
continue;
}
- ddls.add(SchemaChangeHelper.buildDropColumnSql(db,
tableId.table(), colName));
+ ddls.add(
+ SchemaChangeHelper.buildDropColumnSql(
+ db, resolveTargetTable(tableId.table()), colName));
}
for (Column col : pgAdded) {
@@ -243,7 +245,7 @@ public class PostgresDebeziumJsonDeserializer extends
DebeziumJsonDeserializer {
ddls.add(
SchemaChangeHelper.buildAddColumnSql(
db,
- tableId.table(),
+ resolveTargetTable(tableId.table()),
col.name(),
colType + nullable,
defaultObj != null ? String.valueOf(defaultObj) :
null,
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 56d9aeac53e..46d581f58e5 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -169,6 +169,31 @@ public class ConfigUtil {
return result;
}
+ /**
+ * Parse all target-table name mappings from config.
+ *
+ * <p>Scans all keys matching {@code "table.<srcTableName>.target_table"}
and returns a map from
+ * source table name to target (Doris) table name. Tables without a
mapping are NOT included;
+ * callers should use {@code getOrDefault(srcTable, srcTable)}.
+ */
+ public static Map<String, String> parseAllTargetTableMappings(Map<String,
String> config) {
+ String prefix = DataSourceConfigKeys.TABLE + ".";
+ String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX;
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix) && key.endsWith(suffix)) {
+ String srcTable = key.substring(prefix.length(), key.length()
- suffix.length());
+ String rawValue = entry.getValue();
+ String dstTable = rawValue != null ? rawValue.trim() : "";
+ if (!srcTable.isEmpty() && !dstTable.isEmpty()) {
+ result.put(srcTable, dstTable);
+ }
+ }
+ }
+ return result;
+ }
+
public static Map<String, String> toStringMap(String json) {
if (!isJson(json)) {
return null;
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+1 Alice
+2 Bob
+
+-- !select_incremental --
+2 Bob_v2
+3 Carol
+
+-- !select_merge_snapshot --
+100 Src1_A
+200 Src2_A
+
+-- !select_merge_incremental --
+100 Src1_A
+101 Src1_B
+200 Src2_A
+201 Src2_B
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
new file mode 100644
index 00000000000..b31805e682a
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_table_mapping",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_table_mapping"
+ def jobNameMerge = "test_streaming_pg_table_mapping_merge"
+ def currentDb = (sql "select database()")[0][0]
+ def pgSrcTable = "pg_src_table" // upstream PG table name
+ def dorisDstTable = "doris_dst_table" // downstream Doris table name
(mapped)
+ def pgSrcTable2 = "pg_src_table2" // second upstream table
(multi-table merge)
+ def dorisMergeTable = "doris_merge_table" // both PG tables merge into
this Doris table
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ // Cleanup
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+ sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── Case 1: basic table name mapping
─────────────────────────────────
+ // PG table: pg_src_table → Doris table: doris_dst_table
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (1,
'Alice')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (2,
'Bob')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${pgSrcTable}",
+ "offset" = "initial",
+ "table.${pgSrcTable}.target_table" = "${dorisDstTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify the Doris table was created with the mapped name, not the
source name
+ def tables = (sql """show tables from ${currentDb}""").collect { it[0]
}
+ assert tables.contains(dorisDstTable) : "Doris target table
'${dorisDstTable}' should exist"
+ assert !tables.contains(pgSrcTable) : "Source table name
'${pgSrcTable}' must NOT exist in Doris"
+
+ // Wait for snapshot
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC
"""
+
+ // Incremental: INSERT / UPDATE / DELETE must all land in
doris_dst_table
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (3,
'Carol')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${pgSrcTable} SET name =
'Bob_v2' WHERE id = 2"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${pgSrcTable} WHERE id =
1"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id
ASC """).collect { it[0].toInteger() }
+ ids.contains(3) && !ids.contains(1)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id
ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ // ── Case 2: multi-table merge (two PG tables → one Doris table)
──────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable2}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable2} (
+ "id" int,
+ "name" varchar(200),
+ PRIMARY KEY ("id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (100,
'Src1_A')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (200,
'Src2_A')"""
+ }
+
+ sql """CREATE JOB ${jobNameMerge}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${pgSrcTable},${pgSrcTable2}",
+ "offset" = "initial",
+ "table.${pgSrcTable}.target_table" = "${dorisMergeTable}",
+ "table.${pgSrcTable2}.target_table" = "${dorisMergeTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Wait for snapshot rows from both source tables
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(100) && ids.contains(200)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY
id ASC """
+
+ // Incremental from both source tables
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (101,
'Src1_B')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (201,
'Src2_B')"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(101) && ids.contains(201)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER
BY id ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ def mergeJobCnt = sql """select count(1) from jobs("type"="insert")
where Name = '${jobNameMerge}'"""
+ assert mergeJobCnt.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]