This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 509ebe159eb [feature](routine-load) Support flexible partial update 
for routine load (#59896)
509ebe159eb is described below

commit 509ebe159ebaac855b78d4829f4e9636c2aa3971
Author: Yongqiang YANG <[email protected]>
AuthorDate: Tue Jan 20 22:24:11 2026 -0800

    [feature](routine-load) Support flexible partial update for routine load 
(#59896)
    
    Add support for `unique_key_update_mode` property in routine load to
    enable flexible partial columns update. This allows different rows in
    the same batch to update different columns, unlike fixed partial update
    where all rows must update the same columns.
    
    Changes:
    - Add `unique_key_update_mode` property to CreateRoutineLoadInfo with
    values: UPSERT (default), UPDATE_FIXED_COLUMNS, UPDATE_FLEXIBLE_COLUMNS
    - Add validation for flexible partial update constraints (JSON format
    only, no jsonpaths, no fuzzy_parse, no COLUMNS clause, no WHERE clause,
    table must have skip_bitmap column enabled)
    - Update RoutineLoadJob to persist and restore the update mode
    - Update KafkaRoutineLoadJob to pass update mode to task info
    - Support ALTER ROUTINE LOAD to change unique_key_update_mode
    - Add regression tests covering basic usage and error cases
    - Fix HashMap ordering issue in gsonPostProcess for backward
    compatibility
    - Add validation when ALTER changes mode to UPDATE_FLEXIBLE_COLUMNS
    - Add comprehensive ALTER test cases for flexible partial update
    validation
---
 .../java/org/apache/doris/catalog/OlapTable.java   |   27 +
 .../load/routineload/KafkaRoutineLoadJob.java      |    6 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  118 +-
 .../nereids/load/NereidsRoutineLoadTaskInfo.java   |    8 +-
 .../nereids/load/NereidsStreamLoadPlanner.java     |   17 +-
 .../plans/commands/AlterRoutineLoadCommand.java    |    8 +
 .../plans/commands/info/CreateRoutineLoadInfo.java |  111 +-
 .../doris/load/routineload/RoutineLoadJobTest.java |  125 ++
 .../test_routine_load_flexible_partial_update.out  |   82 ++
 .../regression/util/RoutineLoadTestUtils.groovy    |   25 +-
 ...est_routine_load_flexible_partial_update.groovy | 1381 ++++++++++++++++++++
 ...ine_load_partial_update_new_key_behavior.groovy |    2 +-
 12 files changed, 1874 insertions(+), 36 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 39383ceda2c..e2b743f666c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2977,6 +2977,33 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return hasSkipBitmapColumn();
     }
 
+    /**
+     * Validate that the table supports flexible partial update.
+     * Checks the following constraints:
+     * 1. Must be MoW unique key table
+     * 2. Must have skip_bitmap column
+     * 3. Must have light_schema_change enabled
+     * 4. Cannot have variant columns
+     * @throws UserException if any constraint is not satisfied
+     */
+    public void validateForFlexiblePartialUpdate() throws UserException {
+        if (!getEnableUniqueKeyMergeOnWrite()) {
+            throw new UserException("Flexible partial update is only supported 
in unique table MoW");
+        }
+        if (!hasSkipBitmapColumn()) {
+            throw new UserException("Flexible partial update can only support 
table with skip bitmap hidden column."
+                    + " But table " + getName() + " doesn't have it. You can 
use `ALTER TABLE " + getName()
+                    + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it 
to the table.");
+        }
+        if (!getEnableLightSchemaChange()) {
+            throw new UserException("Flexible partial update can only support 
table with light_schema_change enabled."
+                    + " But table " + getName() + "'s property 
light_schema_change is false");
+        }
+        if (hasVariantColumns()) {
+            throw new UserException("Flexible partial update can only support 
table without variant columns.");
+        }
+    }
+
     public boolean getEnableUniqueKeyMergeOnWrite() {
         if (tableProperty == null) {
             return false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 82cd83cfe6e..7cd2a71d2f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -729,7 +729,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     private void modifyPropertiesInternal(Map<String, String> jobProperties,
                                           KafkaDataSourceProperties 
dataSourceProperties)
-            throws DdlException {
+            throws UserException {
         if (null != dataSourceProperties) {
             List<Pair<Integer, Long>> kafkaPartitionOffsets = 
Lists.newArrayList();
             Map<String, String> customKafkaProperties = Maps.newHashMap();
@@ -830,7 +830,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
         try {
             modifyPropertiesInternal(log.getJobProperties(), 
(KafkaDataSourceProperties) log.getDataSourceProperties());
-        } catch (DdlException e) {
+        } catch (UserException e) {
             // should not happen
             LOG.error("failed to replay modify kafka routine load job: {}", 
id, e);
         }
@@ -974,6 +974,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return new NereidsRoutineLoadTaskInfo(execMemLimit, new 
HashMap<>(jobProperties), maxBatchIntervalS,
                 partitionNamesInfo, mergeType, deleteCondition, sequenceCol, 
maxFilterRatio, importColumnDescs,
                 precedingFilter, whereExpr, columnSeparator, lineDelimiter, 
enclose, escape, sendBatchParallelism,
-                loadToSingleTablet, isPartialUpdate, 
partialUpdateNewKeyPolicy, memtableOnSinkNode);
+                loadToSingleTablet, uniqueKeyUpdateMode, 
partialUpdateNewKeyPolicy, memtableOnSinkNode);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 00a55b8729d..f036c3ba192 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -71,6 +71,7 @@ import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
@@ -225,6 +226,7 @@ public abstract class RoutineLoadJob
 
     protected boolean isPartialUpdate = false;
     protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
+    protected TUniqueKeyUpdateMode uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPSERT;
 
     protected String sequenceCol;
 
@@ -387,11 +389,14 @@ public abstract class RoutineLoadJob
         jobProperties.put(info.STRICT_MODE, 
String.valueOf(info.isStrictMode()));
         jobProperties.put(info.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
         jobProperties.put(info.LOAD_TO_SINGLE_TABLET, 
String.valueOf(this.loadToSingleTablet));
-        jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? 
"true" : "false");
-        if (info.isPartialUpdate()) {
-            this.isPartialUpdate = true;
+        // Set unique key update mode
+        this.uniqueKeyUpdateMode = info.getUniqueKeyUpdateMode();
+        this.isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+        jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
uniqueKeyUpdateMode.name());
+        jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
isPartialUpdate ? "true" : "false");
+        if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) {
             this.partialUpdateNewKeyPolicy = 
info.getPartialUpdateNewKeyPolicy();
-            jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
+            
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
                     this.partialUpdateNewKeyPolicy == 
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
         }
         jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
@@ -570,6 +575,10 @@ public abstract class RoutineLoadJob
         return mergeType;
     }
 
+    public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+        return uniqueKeyUpdateMode;
+    }
+
     @Override
     public Expr getDeleteCondition() {
         return deleteCondition;
@@ -1059,7 +1068,8 @@ public abstract class RoutineLoadJob
                 throw new UserException("txn does not exist: " + txnId);
             }
             txnState.addTableIndexes(planner.getDestTable());
-            if (isPartialUpdate) {
+            if (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
+                    || uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
                 txnState.setSchemaForPartialUpdate((OlapTable) table);
             }
 
@@ -1932,9 +1942,29 @@ public abstract class RoutineLoadJob
         if (tableId == 0) {
             isMultiTable = true;
         }
+        // Process UNIQUE_KEY_UPDATE_MODE first to ensure correct backward 
compatibility
+        // with PARTIAL_COLUMNS (HashMap iteration order is not guaranteed)
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeValue = 
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+            TUniqueKeyUpdateMode mode = 
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue);
+            if (mode != null) {
+                uniqueKeyUpdateMode = mode;
+                isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+            } else {
+                uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+            }
+        }
+        // Process remaining properties
         jobProperties.forEach((k, v) -> {
             if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
-                isPartialUpdate = Boolean.parseBoolean(v);
+                // Backward compatibility: only use partial_columns if 
unique_key_update_mode is not set
+                // unique_key_update_mode takes precedence
+                if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+                    isPartialUpdate = Boolean.parseBoolean(v);
+                    if (isPartialUpdate) {
+                        uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+                    }
+                }
             } else if 
(k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
                 if ("ERROR".equalsIgnoreCase(v)) {
                     partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.ERROR;
@@ -1979,7 +2009,7 @@ public abstract class RoutineLoadJob
     public abstract NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() 
throws UserException;
 
     // for ALTER ROUTINE LOAD
-    protected void modifyCommonJobProperties(Map<String, String> 
jobProperties) {
+    protected void modifyCommonJobProperties(Map<String, String> 
jobProperties) throws UserException {
         if 
(jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY))
 {
             this.desireTaskConcurrentNum = Integer.parseInt(
                     
jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
@@ -2010,5 +2040,79 @@ public abstract class RoutineLoadJob
             this.maxBatchSizeBytes = Long.parseLong(
                     
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
         }
+
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = 
jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+            TUniqueKeyUpdateMode newMode = 
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
+            // Validate flexible partial update constraints when changing to 
UPDATE_FLEXIBLE_COLUMNS
+            if (newMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+                validateFlexiblePartialUpdateForAlter();
+            }
+            this.uniqueKeyUpdateMode = newMode;
+            this.isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+            
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
uniqueKeyUpdateMode.name());
+            this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
+        }
+
+        if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
+            // Backward compatibility: only use partial_columns if 
unique_key_update_mode is UPSERT (not explicitly set)
+            // unique_key_update_mode takes precedence
+            this.isPartialUpdate = Boolean.parseBoolean(
+                    
jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
+            if (this.isPartialUpdate && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPSERT) {
+                this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            }
+            this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
+            
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
uniqueKeyUpdateMode.name());
+        }
+    }
+
+    /**
+     * Validate flexible partial update constraints when altering routine load 
job.
+     */
+    private void validateFlexiblePartialUpdateForAlter() throws UserException {
+        // Multi-table load does not support flexible partial update
+        if (isMultiTable) {
+            throw new DdlException("Flexible partial update is not supported 
in multi-table load");
+        }
+
+        // Get the table to check table-level constraints
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            throw new DdlException("Database not found: " + dbId);
+        }
+        Table table = db.getTableNullable(tableId);
+        if (table == null) {
+            throw new DdlException("Table not found: " + tableId);
+        }
+        if (!(table instanceof OlapTable)) {
+            throw new DdlException("Flexible partial update is only supported 
for OLAP tables");
+        }
+        OlapTable olapTable = (OlapTable) table;
+
+        // Validate table-level constraints (MoW, skip_bitmap, 
light_schema_change, variant columns)
+        olapTable.validateForFlexiblePartialUpdate();
+
+        // Routine load specific validations
+        // Must use JSON format
+        String format = 
this.jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+        if (!"json".equalsIgnoreCase(format)) {
+            throw new DdlException("Flexible partial update only supports JSON 
format, but current job uses: "
+                    + format);
+        }
+        // Cannot use fuzzy_parse
+        if (Boolean.parseBoolean(this.jobProperties.getOrDefault(
+                JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) {
+            throw new DdlException("Flexible partial update does not support 
fuzzy_parse");
+        }
+        // Cannot use jsonpaths
+        String jsonPaths = getJsonPaths();
+        if (jsonPaths != null && !jsonPaths.isEmpty()) {
+            throw new DdlException("Flexible partial update does not support 
jsonpaths");
+        }
+        // Cannot specify COLUMNS mapping
+        if (columnDescs != null && !columnDescs.descs.isEmpty()) {
+            throw new DdlException("Flexible partial update does not support 
COLUMNS specification");
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index 17a4d0a08e4..7f8ff5b029e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -80,8 +80,8 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
             String sequenceCol, double maxFilterRatio, 
NereidsImportColumnDescs columnDescs,
             Expression precedingFilter, Expression whereExpr, Separator 
columnSeparator,
             Separator lineDelimiter, byte enclose, byte escape, int 
sendBatchParallelism,
-            boolean loadToSingleTablet, boolean isPartialUpdate, 
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
-            boolean memtableOnSinkNode) {
+            boolean loadToSingleTablet, TUniqueKeyUpdateMode 
uniqueKeyUpdateMode,
+            TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, boolean 
memtableOnSinkNode) {
         this.execMemLimit = execMemLimit;
         this.jobProperties = jobProperties;
         this.maxBatchIntervalS = maxBatchIntervalS;
@@ -99,9 +99,7 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
         this.escape = escape;
         this.sendBatchParallelism = sendBatchParallelism;
         this.loadToSingleTablet = loadToSingleTablet;
-        if (isPartialUpdate) {
-            this.uniquekeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
-        }
+        this.uniquekeyUpdateMode = uniqueKeyUpdateMode;
         this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
         this.memtableOnSinkNode = memtableOnSinkNode;
         this.timeoutSec = calTimeoutSec();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index abb03414b07..d4cc7c01afe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -145,20 +145,9 @@ public class NereidsStreamLoadPlanner {
             }
         }
 
-        if (uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && 
!destTable.hasSkipBitmapColumn()) {
-            String tblName = destTable.getName();
-            throw new UserException("Flexible partial update can only support 
table with skip bitmap hidden column."
-                    + " But table " + tblName + " doesn't have it. You can use 
`ALTER TABLE " + tblName
-                    + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it 
to the table.");
-        }
-        if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
-                && !destTable.getEnableLightSchemaChange()) {
-            throw new UserException("Flexible partial update can only support 
table with light_schema_change enabled."
-                    + " But table " + destTable.getName() + "'s property 
light_schema_change is false");
-        }
-        if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
-                && destTable.hasVariantColumns()) {
-            throw new UserException("Flexible partial update can only support 
table without variant columns.");
+        if (uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+            // Validate table-level constraints for flexible partial update
+            destTable.validateForFlexiblePartialUpdate();
         }
         HashSet<String> partialUpdateInputColumns = new HashSet<>();
         if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
index 1512fd32611..367480c5d93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
@@ -70,6 +70,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
             .add(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)
             .add(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)
             .add(CreateRoutineLoadInfo.PARTIAL_COLUMNS)
+            .add(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)
             .add(CreateRoutineLoadInfo.STRICT_MODE)
             .add(CreateRoutineLoadInfo.TIMEZONE)
             .add(CreateRoutineLoadInfo.WORKLOAD_GROUP)
@@ -279,6 +280,13 @@ public class AlterRoutineLoadCommand extends AlterCommand {
                     String.valueOf(isPartialUpdate));
         }
 
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = 
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+            // Validate the mode string
+            CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
+            
analyzedJobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
modeStr.toUpperCase());
+        }
+
         if (jobProperties.containsKey(CreateRoutineLoadInfo.WORKLOAD_GROUP)) {
             String workloadGroup = 
jobProperties.get(CreateRoutineLoadInfo.WORKLOAD_GROUP);
             if (!StringUtil.isEmpty(workloadGroup)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 5f23ff42711..4a32cb3d4e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -57,6 +57,7 @@ import org.apache.doris.nereids.util.PlanUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.workloadgroup.WorkloadGroup;
 import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
@@ -90,6 +91,7 @@ public class CreateRoutineLoadInfo {
 
     public static final String PARTIAL_COLUMNS = "partial_columns";
     public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = 
"partial_update_new_key_behavior";
+    public static final String UNIQUE_KEY_UPDATE_MODE = 
"unique_key_update_mode";
     public static final String WORKLOAD_GROUP = "workload_group";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
@@ -125,6 +127,7 @@ public class CreateRoutineLoadInfo {
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
             .add(PARTIAL_UPDATE_NEW_KEY_POLICY)
+            .add(UNIQUE_KEY_UPDATE_MODE)
             .add(WORKLOAD_GROUP)
             .add(FileFormatProperties.PROP_FORMAT)
             .add(JsonFileFormatProperties.PROP_JSON_PATHS)
@@ -170,6 +173,7 @@ public class CreateRoutineLoadInfo {
      */
     private boolean isPartialUpdate = false;
     private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
+    private TUniqueKeyUpdateMode uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPSERT;
 
     private String comment = "";
 
@@ -198,8 +202,27 @@ public class CreateRoutineLoadInfo {
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
             .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
         this.mergeType = mergeType;
-        this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
-        if (this.isPartialUpdate && 
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+        // Parse unique_key_update_mode first (takes precedence)
+        if (this.jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = this.jobProperties.get(UNIQUE_KEY_UPDATE_MODE);
+            TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr);
+            if (mode != null) {
+                this.uniqueKeyUpdateMode = mode;
+                if (mode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
+                    this.isPartialUpdate = true;
+                }
+            }
+            // validation will be done in checkJobProperties() if mode is null
+        } else {
+            // Backward compatibility: partial_columns=true maps to 
UPDATE_FIXED_COLUMNS
+            this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
+            if (this.isPartialUpdate) {
+                this.uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            }
+        }
+        // Parse partial_update_new_key_behavior
+        if ((this.isPartialUpdate || this.uniqueKeyUpdateMode != 
TUniqueKeyUpdateMode.UPSERT)
+                && 
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
             String policyStr = 
this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
             if ("APPEND".equals(policyStr)) {
                 this.partialUpdateNewKeyPolicy = 
TPartialUpdateNewRowPolicy.APPEND;
@@ -213,6 +236,40 @@ public class CreateRoutineLoadInfo {
         }
     }
 
+    /**
+     * Parse unique_key_update_mode string to TUniqueKeyUpdateMode enum.
+     * Returns null if the mode string is invalid.
+     */
+    public static TUniqueKeyUpdateMode parseUniqueKeyUpdateMode(String 
modeStr) {
+        if (modeStr == null) {
+            return null;
+        }
+        switch (modeStr.toUpperCase()) {
+            case "UPSERT":
+                return TUniqueKeyUpdateMode.UPSERT;
+            case "UPDATE_FIXED_COLUMNS":
+                return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            case "UPDATE_FLEXIBLE_COLUMNS":
+                return TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Validate unique_key_update_mode string value.
+     * @throws AnalysisException if the mode string is invalid
+     */
+    public static TUniqueKeyUpdateMode 
parseAndValidateUniqueKeyUpdateMode(String modeStr) throws AnalysisException {
+        TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr);
+        if (mode == null) {
+            throw new AnalysisException(UNIQUE_KEY_UPDATE_MODE
+                    + " should be one of {'UPSERT', 'UPDATE_FIXED_COLUMNS', 
'UPDATE_FLEXIBLE_COLUMNS'}, but found "
+                    + modeStr);
+        }
+        return mode;
+    }
+
     public String getName() {
         return name;
     }
@@ -293,6 +350,10 @@ public class CreateRoutineLoadInfo {
         return partialUpdateNewKeyPolicy;
     }
 
+    public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+        return uniqueKeyUpdateMode;
+    }
+
     public String getComment() {
         return comment;
     }
@@ -357,7 +418,7 @@ public class CreateRoutineLoadInfo {
         name = labelNameInfo.getLabel();
 
         Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
-        if (isPartialUpdate && isMultiTable) {
+        if ((isPartialUpdate || uniqueKeyUpdateMode != 
TUniqueKeyUpdateMode.UPSERT) && isMultiTable) {
             throw new AnalysisException("Partial update is not supported in 
multi-table load.");
         }
         if (isMultiTable) {
@@ -379,6 +440,39 @@ public class CreateRoutineLoadInfo {
         if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
             throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
         }
+        // Validate flexible partial update constraints
+        if (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+            validateFlexiblePartialUpdate((OlapTable) table);
+        }
+    }
+
+    private void validateFlexiblePartialUpdate(OlapTable table) throws 
AnalysisException {
+        // Validate table-level constraints (MoW, skip_bitmap, 
light_schema_change, variant columns)
+        try {
+            table.validateForFlexiblePartialUpdate();
+        } catch (UserException e) {
+            throw new AnalysisException(e.getMessage(), e);
+        }
+        // Routine load specific validations
+        // Must use JSON format
+        String format = 
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+        if (!"json".equalsIgnoreCase(format)) {
+            throw new AnalysisException("Flexible partial update only supports 
JSON format, but found: " + format);
+        }
+        // Cannot use fuzzy_parse
+        if 
(Boolean.parseBoolean(jobProperties.getOrDefault(JsonFileFormatProperties.PROP_FUZZY_PARSE,
 "false"))) {
+            throw new AnalysisException("Flexible partial update does not 
support fuzzy_parse");
+        }
+        // Cannot use jsonpaths
+        String jsonPaths = 
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
+        if (jsonPaths != null && !jsonPaths.isEmpty()) {
+            throw new AnalysisException("Flexible partial update does not 
support jsonpaths");
+        }
+        // Cannot specify COLUMNS mapping
+        if (loadPropertyMap != null && loadPropertyMap.values().stream()
+                .anyMatch(p -> p instanceof LoadColumnClause)) {
+            throw new AnalysisException("Flexible partial update does not 
support COLUMNS specification");
+        }
     }
 
     /**
@@ -532,11 +626,18 @@ public class CreateRoutineLoadInfo {
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, 
timezone));
 
+        // check unique_key_update_mode
+        // Note: unique_key_update_mode takes precedence over partial_columns 
for backward compatibility
+        if (jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+            String modeStr = jobProperties.get(UNIQUE_KEY_UPDATE_MODE);
+            parseAndValidateUniqueKeyUpdateMode(modeStr);
+        }
+
         // check partial_update_new_key_behavior
         if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
-            if (!isPartialUpdate) {
+            if (!isPartialUpdate && uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPSERT) {
                 throw new AnalysisException(
-                    PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when 
partial_columns is true");
+                    PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when 
partial update is enabled");
             }
             String policy = 
jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
             if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8b02ad7076a..d2246406428 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -28,7 +28,9 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.kafka.KafkaUtil;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
 import org.apache.doris.thrift.TKafkaRLTaskProgress;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 
@@ -368,4 +370,127 @@ public class RoutineLoadJobTest {
         Assert.assertEquals(expect, showCreateInfo);
     }
 
+    @Test
+    public void testParseUniqueKeyUpdateMode() {
+        // Test valid mode strings
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+                CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPSERT"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+                CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("upsert"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+                
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+                
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("update_fixed_columns"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+                
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+                
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("Update_Flexible_Columns"));
+
+        // Test invalid mode strings
+        
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(null));
+        
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("INVALID"));
+        Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(""));
+        
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("PARTIAL_UPDATE"));
+    }
+
+    @Test
+    public void testParseAndValidateUniqueKeyUpdateMode() throws Exception {
+        // Test valid mode strings
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+                
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPSERT"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+                
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS"));
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+                
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS"));
+
+        // Test invalid mode string throws exception
+        try {
+            
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("INVALID_MODE");
+            Assert.fail("Expected AnalysisException");
+        } catch (Exception e) {
+            
Assert.assertTrue(e.getMessage().contains("unique_key_update_mode"));
+            Assert.assertTrue(e.getMessage().contains("INVALID_MODE"));
+        }
+    }
+
+    @Test
+    public void testUniqueKeyUpdateModeInJobProperties() {
+        // Test that uniqueKeyUpdateMode is properly stored in jobProperties
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+        Map<String, String> jobProperties = Maps.newHashMap();
+        jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
"UPDATE_FLEXIBLE_COLUMNS");
+        Deencapsulation.setField(job, "jobProperties", jobProperties);
+        Deencapsulation.setField(job, "uniqueKeyUpdateMode", 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS);
+
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, 
job.getUniqueKeyUpdateMode());
+    }
+
+    @Test
+    public void testBackwardCompatibilityPartialColumnsToUniqueKeyUpdateMode() 
throws Exception {
+        // Test backward compatibility: partial_columns=true should map to 
UPDATE_FIXED_COLUMNS
+        // This tests the logic in gsonPostProcess without calling the full 
method
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+        Map<String, String> jobProperties = Maps.newHashMap();
+        jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true");
+        // Note: UNIQUE_KEY_UPDATE_MODE is NOT set - testing backward 
compatibility
+        Deencapsulation.setField(job, "jobProperties", jobProperties);
+        Deencapsulation.setField(job, "uniqueKeyUpdateMode", 
TUniqueKeyUpdateMode.UPSERT);
+
+        // Simulate the backward compatibility logic from gsonPostProcess
+        TUniqueKeyUpdateMode uniqueKeyUpdateMode = 
Deencapsulation.getField(job, "uniqueKeyUpdateMode");
+        boolean isPartialUpdate = false;
+
+        // Process PARTIAL_COLUMNS when UNIQUE_KEY_UPDATE_MODE is not set
+        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+            String partialColumnsValue = 
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS);
+            isPartialUpdate = Boolean.parseBoolean(partialColumnsValue);
+            if (isPartialUpdate) {
+                uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            }
+        }
+
+        // Verify the backward compatibility logic
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, 
uniqueKeyUpdateMode);
+        Assert.assertTrue(isPartialUpdate);
+    }
+
+    @Test
+    public void testUniqueKeyUpdateModeTakesPrecedenceOverPartialColumns() 
throws Exception {
+        // Test that unique_key_update_mode takes precedence over 
partial_columns
+        // This tests the logic in gsonPostProcess without calling the full 
method
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+        Map<String, String> jobProperties = Maps.newHashMap();
+        jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, 
"UPDATE_FLEXIBLE_COLUMNS");
+        jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true");
+        Deencapsulation.setField(job, "jobProperties", jobProperties);
+
+        // Simulate the precedence logic from gsonPostProcess
+        TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+        boolean isPartialUpdate = false;
+
+        // Process UNIQUE_KEY_UPDATE_MODE first (takes precedence)
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+            String modeValue = 
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+            TUniqueKeyUpdateMode mode = 
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue);
+            if (mode != null) {
+                uniqueKeyUpdateMode = mode;
+                isPartialUpdate = (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+            }
+        }
+
+        // Process PARTIAL_COLUMNS only if UNIQUE_KEY_UPDATE_MODE results in 
UPSERT
+        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+            String partialColumnsValue = 
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS);
+            isPartialUpdate = Boolean.parseBoolean(partialColumnsValue);
+            if (isPartialUpdate) {
+                uniqueKeyUpdateMode = 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+            }
+        }
+
+        // unique_key_update_mode should take precedence
+        Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, 
uniqueKeyUpdateMode);
+        // isPartialUpdate should be false for UPDATE_FLEXIBLE_COLUMNS
+        Assert.assertFalse(isPartialUpdate);
+    }
+
 }
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
new file mode 100644
index 00000000000..0d4f162d308
--- /dev/null
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
@@ -0,0 +1,82 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_initial1 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+4      david   70      23
+5      eve     60      24
+
+-- !select_after_flex_update1 --
+1      alice   150     20
+2      bob     90      30
+3      chuck   95      22
+4      david   70      23
+5      eve     60      24
+6      frank   \N      \N
+
+-- !select_initial2 --
+1      10      20      30      40
+2      100     200     300     400
+3      1000    2000    3000    4000
+
+-- !select_after_flex_update2 --
+1      11      20      30      40
+2      100     222     333     400
+3      1000    2000    3000    4000
+4      \N      9876    4444    1234
+
+-- !select_initial7 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_flex_where --
+1      alice   100     20
+2      bob     95      21
+3      chuck   80      22
+4      diana   70      \N
+
+-- !select_initial11 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_fixed_update --
+1      alice   150     20
+2      bob     95      21
+3      charlie 80      22
+4      \N      85      \N
+
+-- !select_initial12 --
+1      alice   100     20
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_after_alter_flex --
+1      alice   200     20
+2      bob     90      35
+3      charlie 80      22
+4      diana   \N      \N
+
+-- !select_initial18 --
+1      alice   100     20
+2      bob     90      21
+
+-- !select_after_alter_flex_where --
+1      alice   100     20
+2      bob     95      21
+3      charlie 80      \N
+
+-- !select_initial20 --
+1      alice   100     20
+2      bob     90      21
+
+-- !select_after_alter_upsert --
+1      \N      200     \N
+2      bob     90      21
+3      charlie 80      22
+
+-- !select_initial21 --
+1      alice   100     20
+2      bob     90      21
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index f3f6641d3f6..c1bd321607b 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -107,6 +107,18 @@ class RoutineLoadTestUtils {
     }
 
     static int waitForTaskFinish(Closure sqlRunner, String job, String 
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+        return waitForTaskFinishInternal(sqlRunner, job, tableName, 
expectedMinRows, maxAttempts, false)
+    }
+
+    /**
+     * Wait for routine load task to finish for MOW (Merge-on-Write) unique 
key tables.
+     * Uses skip_delete_bitmap=true to properly count rows during partial 
update operations.
+     */
+    static int waitForTaskFinishMoW(Closure sqlRunner, String job, String 
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+        return waitForTaskFinishInternal(sqlRunner, job, tableName, 
expectedMinRows, maxAttempts, true)
+    }
+
+    private static int waitForTaskFinishInternal(Closure sqlRunner, String 
job, String tableName, int expectedMinRows, int maxAttempts, boolean isMoW) {
         def count = 0
         while (true) {
             def res = sqlRunner.call("show routine load for ${job}")
@@ -114,7 +126,18 @@ class RoutineLoadTestUtils {
             def statistic = res[0][14].toString()
             logger.info("Routine load state: ${routineLoadState}")
             logger.info("Routine load statistic: ${statistic}")
-            def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+            def rowCount
+            if (isMoW) {
+                // For MOW tables, use skip_delete_bitmap to properly count 
rows
+                sqlRunner.call("set skip_delete_bitmap=true")
+                sqlRunner.call("set skip_delete_sign=true")
+                sqlRunner.call("sync")
+                rowCount = sqlRunner.call("select count(*) from ${tableName}")
+                sqlRunner.call("set skip_delete_bitmap=false")
+                sqlRunner.call("set skip_delete_sign=false")
+            } else {
+                rowCount = sqlRunner.call("select count(*) from ${tableName}")
+            }
             if (routineLoadState == "RUNNING" && rowCount[0][0] > 
expectedMinRows) {
                 break
             }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
new file mode 100644
index 00000000000..510eeb0ecab
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -0,0 +1,1381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_flexible_partial_update", "nonConcurrent") {
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        // Test 1: Basic flexible partial update
+        def kafkaJsonTopic1 = "test_routine_load_flexible_partial_update_basic"
+        def tableName1 = "test_routine_load_flex_update_basic"
+        def job1 = "test_flex_partial_update_job_basic"
+
+        sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // verify skip bitmap column is enabled
+        def show_res = sql "show create table ${tableName1}"
+        
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" 
= "true"'))
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName1} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22),
+            (4, 'david', 70, 23),
+            (5, 'eve', 60, 24)
+        """
+
+        qt_select_initial1 "SELECT id, name, score, age FROM ${tableName1} 
ORDER BY id"
+
+        try {
+            // create routine load with flexible partial update
+            sql """
+                CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic1}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            // Row 1: update only score for id=1
+            // Row 2: update only age for id=2
+            // Row 3: update both name and score for id=3
+            // Row 4: insert new row with only id and name
+            def data = [
+                '{"id": 1, "score": 150}',
+                '{"id": 2, "age": 30}',
+                '{"id": 3, "name": "chuck", "score": 95}',
+                '{"id": 6, "name": "frank"}'
+            ]
+
+            data.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic1, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            // With skip_delete_bitmap=true, count = initial + kafka_messages 
= 5 + 4 = 9
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1, 
tableName1, 8)
+
+            // verify flexible partial update results
+            qt_select_after_flex_update1 "SELECT id, name, score, age FROM 
${tableName1} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job1}"
+        }
+
+        // Test 2: Flexible partial update with default values
+        def kafkaJsonTopic2 = 
"test_routine_load_flexible_partial_update_default"
+        def tableName2 = "test_routine_load_flex_update_default"
+        def job2 = "test_flex_partial_update_job_default"
+
+        sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                `id` int NOT NULL,
+                `v1` bigint NULL,
+                `v2` bigint NULL DEFAULT "9876",
+                `v3` bigint NOT NULL,
+                `v4` bigint NOT NULL DEFAULT "1234"
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update with default values'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName2} VALUES
+            (1, 10, 20, 30, 40),
+            (2, 100, 200, 300, 400),
+            (3, 1000, 2000, 3000, 4000)
+        """
+
+        qt_select_initial2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER 
BY id"
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic2}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            def data2 = [
+                '{"id": 1, "v1": 11}',
+                '{"id": 2, "v2": 222, "v3": 333}',
+                '{"id": 4, "v3": 4444}'
+            ]
+
+            data2.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic2, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true, count = initial + kafka_messages 
= 3 + 3 = 6
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2, 
tableName2, 5)
+
+            qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM 
${tableName2} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job2}"
+        }
+
+        // Test 3: Error case - CSV format not supported
+        def kafkaCsvTopic3 = 
"test_routine_load_flexible_partial_update_csv_error"
+        def tableName3 = "test_routine_load_flex_update_csv_error"
+        def job3 = "test_flex_partial_update_job_csv_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName3} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job3} ON ${tableName3}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic3}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update only supports JSON format"
+        }
+
+        // Test 4: Error case - jsonpaths not supported with flexible partial 
update
+        def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
+        def job4 = "test_flex_partial_update_job_jsonpaths_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName4} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job4} ON ${tableName4}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]',
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "test_topic",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support jsonpaths"
+        }
+
+        // Test 5: Error case - fuzzy_parse not supported
+        def kafkaJsonTopic5 = 
"test_routine_load_flexible_partial_update_fuzzy_error"
+        def tableName5 = "test_routine_load_flex_update_fuzzy_error"
+        def job5 = "test_flex_partial_update_job_fuzzy_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName5} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName5} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job5} ON ${tableName5}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "fuzzy_parse" = "true",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic5}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support fuzzy_parse"
+        }
+
+        // Test 6: Error case - COLUMNS clause not supported
+        def kafkaJsonTopic6 = 
"test_routine_load_flexible_partial_update_columns_error"
+        def tableName6 = "test_routine_load_flex_update_columns_error"
+        def job6 = "test_flex_partial_update_job_columns_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName6} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName6} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job6} ON ${tableName6}
+                COLUMNS (id, name, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic6}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update does not support COLUMNS 
specification"
+        }
+
+        // Test 7: Success case - WHERE clause works with flexible partial 
update
+        def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where"
+        def tableName7 = "test_routine_load_flex_update_where"
+        def job7 = "test_flex_partial_update_job_where"
+
+        sql """ DROP TABLE IF EXISTS ${tableName7} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName7} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName7} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial7 "SELECT id, name, score, age FROM ${tableName7} 
ORDER BY id"
+
+        try {
+            // create routine load with WHERE clause and flexible partial 
update
+            sql """
+                CREATE ROUTINE LOAD ${job7} ON ${tableName7}
+                WHERE id > 1
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic7}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data - WHERE clause filters id > 1, so id=1 row 
should NOT be processed
+            def data7 = [
+                '{"id": 1, "score": 999}',
+                '{"id": 2, "score": 95}',
+                '{"id": 3, "name": "chuck"}',
+                '{"id": 4, "name": "diana", "score": 70}'
+            ]
+
+            data7.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic7, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true and WHERE id > 1:
+            // - id=1: 1 version (not updated, filtered by WHERE)
+            // - id=2: 2 versions (original + partial update)
+            // - id=3: 2 versions (original + partial update)
+            // - id=4: 1 version (new row)
+            // Total: 6 rows, so expectedMinRows = 5 (waits for count > 5)
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job7, 
tableName7, 5)
+
+            // verify: id=1 should NOT be updated (filtered by WHERE), 
id=2,3,4 should be updated
+            qt_select_after_flex_where "SELECT id, name, score, age FROM 
${tableName7} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job7}"
+        }
+
+        // Test 8: Error case - table without skip_bitmap column
+        def kafkaJsonTopic8 = 
"test_routine_load_flexible_partial_update_no_skip_bitmap"
+        def tableName8 = "test_routine_load_flex_update_no_skip_bitmap"
+        def job8 = "test_flex_partial_update_job_no_skip_bitmap"
+
+        sql """ DROP TABLE IF EXISTS ${tableName8} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName8} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "false"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job8} ON ${tableName8}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic8}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update can only support table with 
skip bitmap hidden column"
+        }
+
+        // Test 9: Error case - table with variant column
+        def kafkaJsonTopic9 = 
"test_routine_load_flexible_partial_update_variant"
+        def tableName9 = "test_routine_load_flex_update_variant"
+        def job9 = "test_flex_partial_update_job_variant"
+
+        sql """ DROP TABLE IF EXISTS ${tableName9} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName9} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `data` variant NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job9} ON ${tableName9}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic9}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update can only support table without 
variant columns"
+        }
+
+        // Test 10: Error case - invalid unique_key_update_mode value
+        def kafkaJsonTopic10 = 
"test_routine_load_flexible_partial_update_invalid_mode"
+        def tableName10 = "test_routine_load_flex_update_invalid_mode"
+        def job10 = "test_flex_partial_update_job_invalid_mode"
+
+        sql """ DROP TABLE IF EXISTS ${tableName10} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName10} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job10} ON ${tableName10}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "INVALID_MODE"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic10}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "unique_key_update_mode should be one of"
+        }
+
+        // Test 11: UPDATE_FIXED_COLUMNS mode (backward compatibility)
+        def kafkaJsonTopic11 = "test_routine_load_fixed_columns_mode"
+        def tableName11 = "test_routine_load_fixed_columns_mode"
+        def job11 = "test_fixed_columns_mode_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName11} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName11} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName11} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial11 "SELECT id, name, score, age FROM ${tableName11} 
ORDER BY id"
+
+        try {
+            // create routine load with UPDATE_FIXED_COLUMNS mode
+            sql """
+                CREATE ROUTINE LOAD ${job11} ON ${tableName11}
+                COLUMNS (id, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic11}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def data11 = [
+                '{"id": 1, "score": 150}',
+                '{"id": 2, "score": 95}',
+                '{"id": 4, "score": 85}'
+            ]
+
+            data11.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic11, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true, count = initial + kafka_messages 
= 3 + 3 = 6
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11, 
tableName11, 5)
+
+            qt_select_after_fixed_update "SELECT id, name, score, age FROM 
${tableName11} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job11}"
+        }
+
+        // Test 12: ALTER ROUTINE LOAD to change unique_key_update_mode
+        def kafkaJsonTopic12 = "test_routine_load_alter_flex_mode"
+        def tableName12 = "test_routine_load_alter_flex_mode"
+        def job12 = "test_alter_flex_mode_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName12} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName12} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName12} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial12 "SELECT id, name, score, age FROM ${tableName12} 
ORDER BY id"
+
+        try {
+            // create routine load with UPSERT mode (default)
+            sql """
+                CREATE ROUTINE LOAD ${job12} ON ${tableName12}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic12}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // pause the job before altering
+            sql "PAUSE ROUTINE LOAD FOR ${job12}"
+
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode
+            sql """
+                ALTER ROUTINE LOAD FOR ${job12}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job12}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+            // resume the job
+            sql "RESUME ROUTINE LOAD FOR ${job12}"
+
+            // send JSON data with different columns per row
+            def data12 = [
+                '{"id": 1, "score": 200}',
+                '{"id": 2, "age": 35}',
+                '{"id": 4, "name": "diana"}'
+            ]
+
+            data12.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic12, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true, count = initial + kafka_messages 
= 3 + 3 = 6
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12, 
tableName12, 5)
+
+            // verify flexible partial update results after alter
+            qt_select_after_alter_flex "SELECT id, name, score, age FROM 
${tableName12} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job12}"
+        }
+
+        // Test 13: ALTER ROUTINE LOAD - error when trying to change to flex 
mode with invalid settings
+        def kafkaJsonTopic13 = "test_routine_load_alter_flex_error"
+        def tableName13 = "test_routine_load_alter_flex_error"
+        def job13 = "test_alter_flex_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName13} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName13} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "enable_unique_key_skip_bitmap_column" = "false"
+            );
+        """
+
+        try {
+            // create routine load with UPSERT mode (default)
+            sql """
+                CREATE ROUTINE LOAD ${job13} ON ${tableName13}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic13}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // pause the job before altering
+            sql "PAUSE ROUTINE LOAD FOR ${job13}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because table doesn't have skip_bitmap
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job13}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update can only support table with 
skip bitmap hidden column"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job13}"
+        }
+
+        // Test 14: ALTER to flex mode fails when using CSV format
+        def kafkaJsonTopic14 = "test_routine_load_alter_flex_csv_error"
+        def tableName14 = "test_routine_load_alter_flex_csv_error"
+        def job14 = "test_alter_flex_csv_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName14} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName14} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with CSV format
+            sql """
+                CREATE ROUTINE LOAD ${job14} ON ${tableName14}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "csv"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic14}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job14}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because CSV format
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job14}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update only supports JSON format"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job14}"
+        }
+
+        // Test 15: ALTER to flex mode fails when using fuzzy_parse
+        def kafkaJsonTopic15 = "test_routine_load_alter_flex_fuzzy_error"
+        def tableName15 = "test_routine_load_alter_flex_fuzzy_error"
+        def job15 = "test_alter_flex_fuzzy_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName15} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName15} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with fuzzy_parse enabled
+            sql """
+                CREATE ROUTINE LOAD ${job15} ON ${tableName15}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "fuzzy_parse" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic15}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job15}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because fuzzy_parse
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job15}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update does not support 
fuzzy_parse"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job15}"
+        }
+
+        // Test 16: ALTER to flex mode fails with jsonpaths
+        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def job16 = "test_alter_flex_jsonpaths_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with jsonpaths (UPSERT mode)
+            sql """
+                CREATE ROUTINE LOAD ${job16} ON ${tableName16}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]'
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic16}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job16}"
+
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because 
jsonpaths is set
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job16}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update does not support jsonpaths"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job16}"
+        }
+
+        // Test 17: ALTER to flex mode fails when COLUMNS clause is specified
+        def kafkaJsonTopic17 = "test_routine_load_alter_flex_columns_error"
+        def tableName17 = "test_routine_load_alter_flex_columns_error"
+        def job17 = "test_alter_flex_columns_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName17} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName17} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        try {
+            // create routine load with COLUMNS clause
+            sql """
+                CREATE ROUTINE LOAD ${job17} ON ${tableName17}
+                COLUMNS (id, name, score)
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic17}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job17}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because COLUMNS clause
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job17}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update does not support COLUMNS 
specification"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job17}"
+        }
+
+        // Test 18: ALTER to flex mode succeeds with WHERE clause
+        def kafkaJsonTopic18 = "test_routine_load_alter_flex_where"
+        def tableName18 = "test_routine_load_alter_flex_where"
+        def job18 = "test_alter_flex_where_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName18} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName18} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName18} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21)
+        """
+
+        qt_select_initial18 "SELECT id, name, score, age FROM ${tableName18} 
ORDER BY id"
+
+        try {
+            // create routine load with WHERE clause (UPSERT mode)
+            sql """
+                CREATE ROUTINE LOAD ${job18} ON ${tableName18}
+                WHERE id > 1
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic18}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job18}"
+
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
+            sql """
+                ALTER ROUTINE LOAD FOR ${job18}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job18}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+            sql "RESUME ROUTINE LOAD FOR ${job18}"
+
+            // send JSON data - WHERE clause filters id > 1
+            def data18 = [
+                '{"id": 1, "score": 999}',
+                '{"id": 2, "score": 95}',
+                '{"id": 3, "name": "charlie", "score": 80}'
+            ]
+
+            data18.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic18, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true and WHERE id > 1:
+            // - id=1: 1 version (not updated, filtered by WHERE)
+            // - id=2: 2 versions (original + partial update)
+            // - id=3: 1 version (new row)
+            // Total: 4 rows, so expectedMinRows = 3 (waits for count > 3)
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job18, 
tableName18, 3)
+
+            // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3 
should be updated
+            qt_select_after_alter_flex_where "SELECT id, name, score, age FROM 
${tableName18} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job18}"
+        }
+
+        // Test 19: ALTER to flex mode fails on non-MoW table
+        def kafkaJsonTopic19 = "test_routine_load_alter_flex_non_mow_error"
+        def tableName19 = "test_routine_load_alter_flex_non_mow_error"
+        def job19 = "test_alter_flex_non_mow_error_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName19} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName19} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "false"
+            );
+        """
+
+        try {
+            // create routine load
+            sql """
+                CREATE ROUTINE LOAD ${job19} ON ${tableName19}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic19}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job19}"
+
+            // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail 
because non-MoW
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job19}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update is only supported in unique 
table MoW"
+            }
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job19}"
+        }
+
+        // Test 20: ALTER from flex mode to UPSERT mode (success case)
+        def kafkaJsonTopic20 = "test_routine_load_alter_flex_to_upsert"
+        def tableName20 = "test_routine_load_alter_flex_to_upsert"
+        def job20 = "test_alter_flex_to_upsert_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName20} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName20} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName20} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21)
+        """
+
+        qt_select_initial20 "SELECT id, name, score, age FROM ${tableName20} 
ORDER BY id"
+
+        try {
+            // create routine load with flex mode
+            sql """
+                CREATE ROUTINE LOAD ${job20} ON ${tableName20}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic20}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job20}"
+
+            // alter to UPSERT mode
+            sql """
+                ALTER ROUTINE LOAD FOR ${job20}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPSERT"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job20}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPSERT"))
+
+            sql "RESUME ROUTINE LOAD FOR ${job20}"
+
+            // send JSON data - with UPSERT mode, missing columns should be 
NULL
+            def data20 = [
+                '{"id": 1, "score": 200}',
+                '{"id": 3, "name": "charlie", "score": 80, "age": 22}'
+            ]
+
+            data20.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic20, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // With skip_delete_bitmap=true, count = initial + kafka_messages 
= 2 + 2 = 4
+            RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20, 
tableName20, 3)
+
+            // with UPSERT, id=1 should have NULL for name and age (full row 
replacement)
+            qt_select_after_alter_upsert "SELECT id, name, score, age FROM 
${tableName20} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job20}"
+        }
+
+        // Test 21: ALTER from flex mode to UPDATE_FIXED_COLUMNS mode (success 
case)
+        def kafkaJsonTopic21 = "test_routine_load_alter_flex_to_fixed"
+        def tableName21 = "test_routine_load_alter_flex_to_fixed"
+        def job21 = "test_alter_flex_to_fixed_job"
+
+        sql """ DROP TABLE IF EXISTS ${tableName21} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName21} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName21} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21)
+        """
+
+        qt_select_initial21 "SELECT id, name, score, age FROM ${tableName21} 
ORDER BY id"
+
+        try {
+            // create routine load with flex mode
+            sql """
+                CREATE ROUTINE LOAD ${job21} ON ${tableName21}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic21}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "PAUSE ROUTINE LOAD FOR ${job21}"
+
+            // alter to UPDATE_FIXED_COLUMNS mode - need to add COLUMNS clause 
via ALTER
+            // Note: This changes from flexible to fixed partial update
+            sql """
+                ALTER ROUTINE LOAD FOR ${job21}
+                PROPERTIES
+                (
+                    "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS"
+                );
+            """
+
+            // verify the property was changed
+            def res = sql "SHOW ROUTINE LOAD FOR ${job21}"
+            def jobProperties = res[0][11].toString()
+            logger.info("Altered routine load job properties: 
${jobProperties}")
+            assertTrue(jobProperties.contains("UPDATE_FIXED_COLUMNS"))
+
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job21}"
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
index a6a97253e98..eab01074ff3 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
@@ -276,7 +276,7 @@ suite("test_routine_load_partial_update_new_key_behavior", 
"nonConcurrent") {
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
             """
-            exception "partial_update_new_key_behavior can only be set when 
partial_columns is true"
+            exception "partial_update_new_key_behavior can only be set when 
partial update is enabled"
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to