This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 670deb9ba76 [Fix](routine load) Fix routine load partial update 
(#59209)
670deb9ba76 is described below

commit 670deb9ba760922e69befdcf52760bafce9a2eab
Author: bobhan1 <[email protected]>
AuthorDate: Wed Dec 24 02:47:43 2025 +0800

    [Fix](routine load) Fix routine load partial update (#59209)
    
    ### What problem does this PR solve?
    1. fix`"partial_colunms"="true"` doesn't take effect in routine load
    2. add `"partial_update_new_key_behavior"="APPEND"/"ERROR"` property in
    routine load
    
    doc: https://github.com/apache/doris-website/pull/3211
---
 .../load/routineload/KafkaRoutineLoadJob.java      |  11 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  15 ++
 .../nereids/load/NereidsRoutineLoadTaskInfo.java   |  30 ++-
 .../plans/commands/info/CreateRoutineLoadInfo.java |  30 +++
 .../test_routine_load_partial_update.out           |  12 +
 ...outine_load_partial_update_new_key_behavior.out |  27 ++
 .../test_routine_load_partial_update.groovy        | 108 ++++++++
 ...ine_load_partial_update_new_key_behavior.groovy | 282 +++++++++++++++++++++
 8 files changed, 510 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 7fccb0ae0e1..d8844834699 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -51,6 +51,7 @@ import 
org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
@@ -790,6 +791,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             if 
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
                 this.isPartialUpdate = 
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
             }
+            if 
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY))
 {
+                String policy = 
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
+                if ("ERROR".equalsIgnoreCase(policy)) {
+                    this.partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.ERROR;
+                } else {
+                    this.partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
+                }
+            }
         }
         LOG.info("modify the properties of kafka routine load job: {}, 
jobProperties: {}, datasource properties: {}",
                 this.id, jobProperties, dataSourceProperties);
@@ -962,6 +971,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return new NereidsRoutineLoadTaskInfo(execMemLimit, new 
HashMap<>(jobProperties), maxBatchIntervalS,
                 partitionNamesInfo, mergeType, deleteCondition, sequenceCol, 
maxFilterRatio, importColumnDescs,
                 precedingFilter, whereExpr, columnSeparator, lineDelimiter, 
enclose, escape, sendBatchParallelism,
-                loadToSingleTablet, isPartialUpdate, memtableOnSinkNode);
+                loadToSingleTablet, isPartialUpdate, 
partialUpdateNewKeyPolicy, memtableOnSinkNode);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index d768da582a2..41b476ddc6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -68,6 +68,7 @@ import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
@@ -223,6 +224,7 @@ public abstract class RoutineLoadJob
     protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
 
     protected boolean isPartialUpdate = false;
+    protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
 
     protected String sequenceCol;
 
@@ -388,6 +390,9 @@ public abstract class RoutineLoadJob
         jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? 
"true" : "false");
         if (info.isPartialUpdate()) {
             this.isPartialUpdate = true;
+            this.partialUpdateNewKeyPolicy = 
info.getPartialUpdateNewKeyPolicy();
+            jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
+                    this.partialUpdateNewKeyPolicy == 
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
         }
         jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
 
@@ -1869,6 +1874,10 @@ public abstract class RoutineLoadJob
 
         // job properties defined in CreateRoutineLoadStmt
         jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
+        if (isPartialUpdate) {
+            
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
+                    partialUpdateNewKeyPolicy == 
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
+        }
         jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, 
String.valueOf(maxErrorNum));
         
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, 
String.valueOf(maxBatchIntervalS));
         jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, 
String.valueOf(maxBatchRows));
@@ -1921,6 +1930,12 @@ public abstract class RoutineLoadJob
         jobProperties.forEach((k, v) -> {
             if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
                 isPartialUpdate = Boolean.parseBoolean(v);
+            } else if 
(k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+                if ("ERROR".equalsIgnoreCase(v)) {
+                    partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.ERROR;
+                } else {
+                    partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
+                }
             }
         });
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index 3cf39955ab8..17a4d0a08e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -28,6 +28,8 @@ import 
org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
 
 import com.google.common.base.Strings;
 
@@ -65,7 +67,8 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
     protected boolean emptyFieldAsNull;
     protected int sendBatchParallelism;
     protected boolean loadToSingleTablet;
-    protected boolean isPartialUpdate;
+    protected TUniqueKeyUpdateMode uniquekeyUpdateMode = 
TUniqueKeyUpdateMode.UPSERT;
+    protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
     protected boolean memtableOnSinkNode;
     protected int timeoutSec;
 
@@ -77,7 +80,8 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
             String sequenceCol, double maxFilterRatio, 
NereidsImportColumnDescs columnDescs,
             Expression precedingFilter, Expression whereExpr, Separator 
columnSeparator,
             Separator lineDelimiter, byte enclose, byte escape, int 
sendBatchParallelism,
-            boolean loadToSingleTablet, boolean isPartialUpdate, boolean 
memtableOnSinkNode) {
+            boolean loadToSingleTablet, boolean isPartialUpdate, 
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
+            boolean memtableOnSinkNode) {
         this.execMemLimit = execMemLimit;
         this.jobProperties = jobProperties;
         this.maxBatchIntervalS = maxBatchIntervalS;
@@ -95,7 +99,10 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
         this.escape = escape;
         this.sendBatchParallelism = sendBatchParallelism;
         this.loadToSingleTablet = loadToSingleTablet;
-        this.isPartialUpdate = isPartialUpdate;
+        if (isPartialUpdate) {
+            this.uniquekeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+        }
+        this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
         this.memtableOnSinkNode = memtableOnSinkNode;
         this.timeoutSec = calTimeoutSec();
     }
@@ -311,7 +318,22 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
 
     @Override
     public boolean isFixedPartialUpdate() {
-        return isPartialUpdate;
+        return uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+    }
+
+    @Override
+    public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+        return uniquekeyUpdateMode;
+    }
+
+    @Override
+    public boolean isFlexiblePartialUpdate() {
+        return uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
+    }
+
+    @Override
+    public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() {
+        return partialUpdateNewKeyPolicy;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 617e49b0278..5f23ff42711 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -56,6 +56,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
 import org.apache.doris.nereids.util.PlanUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
@@ -88,6 +89,7 @@ public class CreateRoutineLoadInfo {
     public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
 
     public static final String PARTIAL_COLUMNS = "partial_columns";
+    public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = 
"partial_update_new_key_behavior";
     public static final String WORKLOAD_GROUP = "workload_group";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
@@ -122,6 +124,7 @@ public class CreateRoutineLoadInfo {
             .add(SEND_BATCH_PARALLELISM)
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
+            .add(PARTIAL_UPDATE_NEW_KEY_POLICY)
             .add(WORKLOAD_GROUP)
             .add(FileFormatProperties.PROP_FORMAT)
             .add(JsonFileFormatProperties.PROP_JSON_PATHS)
@@ -166,6 +169,7 @@ public class CreateRoutineLoadInfo {
      * support partial columns load(Only Unique Key Columns)
      */
     private boolean isPartialUpdate = false;
+    private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
 
     private String comment = "";
 
@@ -195,6 +199,15 @@ public class CreateRoutineLoadInfo {
             .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
         this.mergeType = mergeType;
         this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
+        if (this.isPartialUpdate && 
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+            String policyStr = 
this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
+            if ("APPEND".equals(policyStr)) {
+                this.partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
+            } else if ("ERROR".equals(policyStr)) {
+                this.partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.ERROR;
+            }
+            // validation will be done in checkJobProperties()
+        }
         if (comment != null) {
             this.comment = comment;
         }
@@ -276,6 +289,10 @@ public class CreateRoutineLoadInfo {
         return isPartialUpdate;
     }
 
+    public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
+        return partialUpdateNewKeyPolicy;
+    }
+
     public String getComment() {
         return comment;
     }
@@ -515,6 +532,19 @@ public class CreateRoutineLoadInfo {
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, 
timezone));
 
+        // check partial_update_new_key_behavior
+        if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+            if (!isPartialUpdate) {
+                throw new AnalysisException(
+                    PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when 
partial_columns is true");
+            }
+            String policy = 
jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
+            if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
+                throw new AnalysisException(
+                    PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of 
{'APPEND', 'ERROR'}, but found " + policy);
+            }
+        }
+
         String format = 
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
         fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(format);
         fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
new file mode 100644
index 00000000000..a8eed2b2a05
--- /dev/null
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_initial --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_partial_update --
+1      alice   150     20
+2      bob     95      21
+3      charlie 80      22
+100    \N      100     \N
+
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
new file mode 100644
index 00000000000..45b434b8cad
--- /dev/null
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_initial --
+1      1       1       1
+2      2       2       2
+3      3       3       3
+
+-- !select_after_append --
+1      10      1       1
+2      20      2       2
+3      3       3       3
+4      40      \N      \N
+5      50      \N      \N
+
+-- !select_after_error_mode --
+1      1       100     1
+2      2       200     2
+3      3       3       3
+4      4       40      4
+5      5       50      5
+
+-- !select_after_error_rejected --
+1      1       100     1
+2      2       200     2
+3      3       3       3
+4      4       40      4
+5      5       50      5
+
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
new file mode 100644
index 00000000000..85888b9e439
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_partial_update", "nonConcurrent") {
+    def kafkaCsvTopic = "test_routine_load_partial_update"
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_partial_update"
+        def job = "test_partial_update_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `id` int NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test partial update'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial "SELECT * FROM ${tableName} ORDER BY id"
+
+        try {
+            // create routine load with partial_columns=true
+            // only update id and score columns
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS (id, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "partial_columns" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send partial update data to kafka
+            // update score for id=1 from 100 to 150
+            // update score for id=2 from 90 to 95
+            def data = [
+                "1,150",
+                "2,95",
+                "100,100"
+            ]
+
+            data.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3)
+
+            // verify partial update: score should be updated, name and age 
should remain unchanged
+            qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER 
BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job}"
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
new file mode 100644
index 00000000000..a6a97253e98
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_partial_update_new_key_behavior", "nonConcurrent") {
+    def kafkaCsvTopic = "test_routine_load_partial_update_new_key_behavior"
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        // Test 1: partial_update_new_key_behavior=APPEND (default)
+        def tableName1 = "test_routine_load_pu_new_key_append"
+        def job1 = "test_new_key_behavior_append"
+
+        sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                `k` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+            ) ENGINE=OLAP
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        sql """
+            INSERT INTO ${tableName1} VALUES
+            (1, 1, 1, 1),
+            (2, 2, 2, 2),
+            (3, 3, 3, 3)
+        """
+
+        qt_select_initial "SELECT * FROM ${tableName1} ORDER BY k"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS (k, c1)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "partial_columns" = "true",
+                    "partial_update_new_key_behavior" = "append"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send data with existing keys and new keys
+            def data1 = [
+                "1,10",  // update existing key
+                "2,20",  // update existing key
+                "4,40",  // new key - should be appended with default values 
for c2 and c3
+                "5,50"   // new key - should be appended with default values 
for c2 and c3
+            ]
+
+            data1.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            sql "set skip_delete_bitmap=true;"
+            sql "sync;"
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 6)
+            sql "set skip_delete_bitmap=false;"
+            sql "sync;"
+
+            // verify: new keys should be appended
+            qt_select_after_append "SELECT * FROM ${tableName1} ORDER BY k"
+
+        } catch (Exception e) {
+            logger.info("Caught expected exception: ${e.getMessage()}")
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job1}"
+        }
+
+        // Test 2: partial_update_new_key_behavior=ERROR
+        def tableName2 = "test_routine_load_pu_new_key_error"
+        def job2 = "test_new_key_behavior_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                `k` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+            ) ENGINE=OLAP
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        sql """
+            INSERT INTO ${tableName2} VALUES
+            (1, 1, 1, 1),
+            (2, 2, 2, 2),
+            (3, 3, 3, 3),
+            (4, 4, 4, 4),
+            (5, 5, 5, 5)
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS (k, c2)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "partial_columns" = "true",
+                    "partial_update_new_key_behavior" = "error"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send data with only existing keys (should succeed)
+            def data2 = [
+                "1,100",
+                "2,200"
+            ]
+
+            data2.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            sql "set skip_delete_bitmap=true;"
+            sql "sync;"
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 6)
+            sql "set skip_delete_bitmap=false;"
+            sql "sync;"
+
+            // verify: existing keys should be updated
+            qt_select_after_error_mode "SELECT * FROM ${tableName2} ORDER BY k"
+
+            // Now send data with new keys - this should fail the task
+            def data3 = [
+                "10,1000",  // new key - should cause error
+                "11,1100"   // new key - should cause error
+            ]
+
+            data3.each { line ->
+                logger.info("Sending to Kafka with new keys: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskAbort(runSql, job2)
+            def state = sql "SHOW ROUTINE LOAD FOR ${job2}"
+            logger.info("routine load state after new keys: 
${state[0][8].toString()}")
+            logger.info("routine load error rows: ${state[0][15].toString()}")
+
+            // the data should not be loaded due to error
+            qt_select_after_error_rejected "SELECT * FROM ${tableName2} ORDER 
BY k"
+
+        } catch (Exception e) {
+            logger.info("Caught expected exception: ${e.getMessage()}")
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job2}"
+        }
+
+        // Test 3: Test invalid property value
+        def tableName3 = "test_routine_load_pu_invalid_prop"
+        sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName3} (
+                `k` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+            ) ENGINE=OLAP
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD test_invalid_property ON ${tableName3}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS (k, c3)
+                PROPERTIES
+                (
+                    "partial_columns" = "true",
+                    "partial_update_new_key_behavior" = "invalid"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "partial_update_new_key_behavior should be one of 
{'APPEND', 'ERROR'}"
+        }
+
+        // Test 4: Test setting property without partial_columns
+        def tableName4 = "test_routine_load_pu_without_partial"
+        sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName4} (
+                `k` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+            ) ENGINE=OLAP
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD test_without_partial_columns ON 
${tableName4}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS (k, c3)
+                PROPERTIES
+                (
+                    "partial_update_new_key_behavior" = "append"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "partial_update_new_key_behavior can only be set when 
partial_columns is true"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to