This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 670deb9ba76 [Fix](routine load) Fix routine load partial update
(#59209)
670deb9ba76 is described below
commit 670deb9ba760922e69befdcf52760bafce9a2eab
Author: bobhan1 <[email protected]>
AuthorDate: Wed Dec 24 02:47:43 2025 +0800
[Fix](routine load) Fix routine load partial update (#59209)
### What problem does this PR solve?
1. fix`"partial_colunms"="true"` doesn't take effect in routine load
2. add `"partial_update_new_key_behavior"="APPEND"/"ERROR"` property in
routine load
doc: https://github.com/apache/doris-website/pull/3211
---
.../load/routineload/KafkaRoutineLoadJob.java | 11 +-
.../doris/load/routineload/RoutineLoadJob.java | 15 ++
.../nereids/load/NereidsRoutineLoadTaskInfo.java | 30 ++-
.../plans/commands/info/CreateRoutineLoadInfo.java | 30 +++
.../test_routine_load_partial_update.out | 12 +
...outine_load_partial_update_new_key_behavior.out | 27 ++
.../test_routine_load_partial_update.groovy | 108 ++++++++
...ine_load_partial_update_new_key_behavior.groovy | 282 +++++++++++++++++++++
8 files changed, 510 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 7fccb0ae0e1..d8844834699 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -51,6 +51,7 @@ import
org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
@@ -790,6 +791,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
if
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
this.isPartialUpdate =
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY))
{
+ String policy =
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
+ if ("ERROR".equalsIgnoreCase(policy)) {
+ this.partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.ERROR;
+ } else {
+ this.partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
+ }
+ }
}
LOG.info("modify the properties of kafka routine load job: {},
jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
@@ -962,6 +971,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return new NereidsRoutineLoadTaskInfo(execMemLimit, new
HashMap<>(jobProperties), maxBatchIntervalS,
partitionNamesInfo, mergeType, deleteCondition, sequenceCol,
maxFilterRatio, importColumnDescs,
precedingFilter, whereExpr, columnSeparator, lineDelimiter,
enclose, escape, sendBatchParallelism,
- loadToSingleTablet, isPartialUpdate, memtableOnSinkNode);
+ loadToSingleTablet, isPartialUpdate,
partialUpdateNewKeyPolicy, memtableOnSinkNode);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index d768da582a2..41b476ddc6f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -68,6 +68,7 @@ import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
@@ -223,6 +224,7 @@ public abstract class RoutineLoadJob
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
protected boolean isPartialUpdate = false;
+ protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
protected String sequenceCol;
@@ -388,6 +390,9 @@ public abstract class RoutineLoadJob
jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ?
"true" : "false");
if (info.isPartialUpdate()) {
this.isPartialUpdate = true;
+ this.partialUpdateNewKeyPolicy =
info.getPartialUpdateNewKeyPolicy();
+ jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
+ this.partialUpdateNewKeyPolicy ==
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
}
jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
@@ -1869,6 +1874,10 @@ public abstract class RoutineLoadJob
// job properties defined in CreateRoutineLoadStmt
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
+ if (isPartialUpdate) {
+
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
+ partialUpdateNewKeyPolicy ==
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
+ }
jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY,
String.valueOf(maxErrorNum));
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY,
String.valueOf(maxBatchIntervalS));
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY,
String.valueOf(maxBatchRows));
@@ -1921,6 +1930,12 @@ public abstract class RoutineLoadJob
jobProperties.forEach((k, v) -> {
if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
isPartialUpdate = Boolean.parseBoolean(v);
+ } else if
(k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+ if ("ERROR".equalsIgnoreCase(v)) {
+ partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.ERROR;
+ } else {
+ partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
+ }
}
});
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index 3cf39955ab8..17a4d0a08e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -28,6 +28,8 @@ import
org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import com.google.common.base.Strings;
@@ -65,7 +67,8 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
protected boolean emptyFieldAsNull;
protected int sendBatchParallelism;
protected boolean loadToSingleTablet;
- protected boolean isPartialUpdate;
+ protected TUniqueKeyUpdateMode uniquekeyUpdateMode =
TUniqueKeyUpdateMode.UPSERT;
+ protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
protected boolean memtableOnSinkNode;
protected int timeoutSec;
@@ -77,7 +80,8 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
String sequenceCol, double maxFilterRatio,
NereidsImportColumnDescs columnDescs,
Expression precedingFilter, Expression whereExpr, Separator
columnSeparator,
Separator lineDelimiter, byte enclose, byte escape, int
sendBatchParallelism,
- boolean loadToSingleTablet, boolean isPartialUpdate, boolean
memtableOnSinkNode) {
+ boolean loadToSingleTablet, boolean isPartialUpdate,
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
+ boolean memtableOnSinkNode) {
this.execMemLimit = execMemLimit;
this.jobProperties = jobProperties;
this.maxBatchIntervalS = maxBatchIntervalS;
@@ -95,7 +99,10 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
this.escape = escape;
this.sendBatchParallelism = sendBatchParallelism;
this.loadToSingleTablet = loadToSingleTablet;
- this.isPartialUpdate = isPartialUpdate;
+ if (isPartialUpdate) {
+ this.uniquekeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
this.memtableOnSinkNode = memtableOnSinkNode;
this.timeoutSec = calTimeoutSec();
}
@@ -311,7 +318,22 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
@Override
public boolean isFixedPartialUpdate() {
- return isPartialUpdate;
+ return uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+
+ @Override
+ public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+ return uniquekeyUpdateMode;
+ }
+
+ @Override
+ public boolean isFlexiblePartialUpdate() {
+ return uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
+ }
+
+ @Override
+ public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() {
+ return partialUpdateNewKeyPolicy;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 617e49b0278..5f23ff42711 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -56,6 +56,7 @@ import
org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -88,6 +89,7 @@ public class CreateRoutineLoadInfo {
public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
public static final String PARTIAL_COLUMNS = "partial_columns";
+ public static final String PARTIAL_UPDATE_NEW_KEY_POLICY =
"partial_update_new_key_behavior";
public static final String WORKLOAD_GROUP = "workload_group";
public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
@@ -122,6 +124,7 @@ public class CreateRoutineLoadInfo {
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
+ .add(PARTIAL_UPDATE_NEW_KEY_POLICY)
.add(WORKLOAD_GROUP)
.add(FileFormatProperties.PROP_FORMAT)
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
@@ -166,6 +169,7 @@ public class CreateRoutineLoadInfo {
* support partial columns load(Only Unique Key Columns)
*/
private boolean isPartialUpdate = false;
+ private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
private String comment = "";
@@ -195,6 +199,15 @@ public class CreateRoutineLoadInfo {
.createDataSource(typeName, dataSourceProperties,
this.isMultiTable);
this.mergeType = mergeType;
this.isPartialUpdate =
this.jobProperties.getOrDefault(PARTIAL_COLUMNS,
"false").equalsIgnoreCase("true");
+ if (this.isPartialUpdate &&
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+ String policyStr =
this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
+ if ("APPEND".equals(policyStr)) {
+ this.partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
+ } else if ("ERROR".equals(policyStr)) {
+ this.partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.ERROR;
+ }
+ // validation will be done in checkJobProperties()
+ }
if (comment != null) {
this.comment = comment;
}
@@ -276,6 +289,10 @@ public class CreateRoutineLoadInfo {
return isPartialUpdate;
}
+ public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
+ return partialUpdateNewKeyPolicy;
+ }
+
public String getComment() {
return comment;
}
@@ -515,6 +532,19 @@ public class CreateRoutineLoadInfo {
}
timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE,
timezone));
+ // check partial_update_new_key_behavior
+ if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+ if (!isPartialUpdate) {
+ throw new AnalysisException(
+ PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when
partial_columns is true");
+ }
+ String policy =
jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
+ if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
+ throw new AnalysisException(
+ PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of
{'APPEND', 'ERROR'}, but found " + policy);
+ }
+ }
+
String format =
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
fileFormatProperties =
FileFormatProperties.createFileFormatProperties(format);
fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
new file mode 100644
index 00000000000..a8eed2b2a05
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_initial --
+1 alice 100 20
+2 bob 90 21
+3 charlie 80 22
+
+-- !select_after_partial_update --
+1 alice 150 20
+2 bob 95 21
+3 charlie 80 22
+100 \N 100 \N
+
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
new file mode 100644
index 00000000000..45b434b8cad
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_initial --
+1 1 1 1
+2 2 2 2
+3 3 3 3
+
+-- !select_after_append --
+1 10 1 1
+2 20 2 2
+3 3 3 3
+4 40 \N \N
+5 50 \N \N
+
+-- !select_after_error_mode --
+1 1 100 1
+2 2 200 2
+3 3 3 3
+4 4 40 4
+5 5 50 5
+
+-- !select_after_error_rejected --
+1 1 100 1
+2 2 200 2
+3 3 3 3
+4 4 40 4
+5 5 50 5
+
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
new file mode 100644
index 00000000000..85888b9e439
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_partial_update", "nonConcurrent") {
+ def kafkaCsvTopic = "test_routine_load_partial_update"
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_partial_update"
+ def job = "test_partial_update_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `id` int NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'test partial update'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21),
+ (3, 'charlie', 80, 22)
+ """
+
+ qt_select_initial "SELECT * FROM ${tableName} ORDER BY id"
+
+ try {
+ // create routine load with partial_columns=true
+ // only update id and score columns
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS (id, score)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "partial_columns" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send partial update data to kafka
+ // update score for id=1 from 100 to 150
+ // update score for id=2 from 90 to 95
+ def data = [
+ "1,150",
+ "2,95",
+ "100,100"
+ ]
+
+ data.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // wait for routine load task to finish
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3)
+
+ // verify partial update: score should be updated, name and age
should remain unchanged
+ qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER
BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job}"
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
new file mode 100644
index 00000000000..a6a97253e98
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_partial_update_new_key_behavior", "nonConcurrent") {
+ def kafkaCsvTopic = "test_routine_load_partial_update_new_key_behavior"
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ // Test 1: partial_update_new_key_behavior=APPEND (default)
+ def tableName1 = "test_routine_load_pu_new_key_append"
+ def job1 = "test_new_key_behavior_append"
+
+ sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ `k` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ sql """
+ INSERT INTO ${tableName1} VALUES
+ (1, 1, 1, 1),
+ (2, 2, 2, 2),
+ (3, 3, 3, 3)
+ """
+
+ qt_select_initial "SELECT * FROM ${tableName1} ORDER BY k"
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS (k, c1)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "partial_columns" = "true",
+ "partial_update_new_key_behavior" = "append"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send data with existing keys and new keys
+ def data1 = [
+ "1,10", // update existing key
+ "2,20", // update existing key
+ "4,40", // new key - should be appended with default values
for c2 and c3
+ "5,50" // new key - should be appended with default values
for c2 and c3
+ ]
+
+ data1.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // wait for routine load task to finish
+ sql "set skip_delete_bitmap=true;"
+ sql "sync;"
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 6)
+ sql "set skip_delete_bitmap=false;"
+ sql "sync;"
+
+ // verify: new keys should be appended
+ qt_select_after_append "SELECT * FROM ${tableName1} ORDER BY k"
+
+ } catch (Exception e) {
+ logger.info("Caught expected exception: ${e.getMessage()}")
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job1}"
+ }
+
+ // Test 2: partial_update_new_key_behavior=ERROR
+ def tableName2 = "test_routine_load_pu_new_key_error"
+ def job2 = "test_new_key_behavior_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ `k` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ sql """
+ INSERT INTO ${tableName2} VALUES
+ (1, 1, 1, 1),
+ (2, 2, 2, 2),
+ (3, 3, 3, 3),
+ (4, 4, 4, 4),
+ (5, 5, 5, 5)
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS (k, c2)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "partial_columns" = "true",
+ "partial_update_new_key_behavior" = "error"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send data with only existing keys (should succeed)
+ def data2 = [
+ "1,100",
+ "2,200"
+ ]
+
+ data2.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // wait for routine load task to finish
+ sql "set skip_delete_bitmap=true;"
+ sql "sync;"
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 6)
+ sql "set skip_delete_bitmap=false;"
+ sql "sync;"
+
+ // verify: existing keys should be updated
+ qt_select_after_error_mode "SELECT * FROM ${tableName2} ORDER BY k"
+
+ // Now send data with new keys - this should fail the task
+ def data3 = [
+ "10,1000", // new key - should cause error
+ "11,1100" // new key - should cause error
+ ]
+
+ data3.each { line ->
+ logger.info("Sending to Kafka with new keys: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ RoutineLoadTestUtils.waitForTaskAbort(runSql, job2)
+ def state = sql "SHOW ROUTINE LOAD FOR ${job2}"
+ logger.info("routine load state after new keys:
${state[0][8].toString()}")
+ logger.info("routine load error rows: ${state[0][15].toString()}")
+
+ // the data should not be loaded due to error
+ qt_select_after_error_rejected "SELECT * FROM ${tableName2} ORDER
BY k"
+
+ } catch (Exception e) {
+ logger.info("Caught expected exception: ${e.getMessage()}")
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job2}"
+ }
+
+ // Test 3: Test invalid property value
+ def tableName3 = "test_routine_load_pu_invalid_prop"
+ sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName3} (
+ `k` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD test_invalid_property ON ${tableName3}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS (k, c3)
+ PROPERTIES
+ (
+ "partial_columns" = "true",
+ "partial_update_new_key_behavior" = "invalid"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "partial_update_new_key_behavior should be one of
{'APPEND', 'ERROR'}"
+ }
+
+ // Test 4: Test setting property without partial_columns
+ def tableName4 = "test_routine_load_pu_without_partial"
+ sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName4} (
+ `k` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD test_without_partial_columns ON
${tableName4}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS (k, c3)
+ PROPERTIES
+ (
+ "partial_update_new_key_behavior" = "append"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "partial_update_new_key_behavior can only be set when
partial_columns is true"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]