This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 509ebe159eb [feature](routine-load) Support flexible partial update
for routine load (#59896)
509ebe159eb is described below
commit 509ebe159ebaac855b78d4829f4e9636c2aa3971
Author: Yongqiang YANG <[email protected]>
AuthorDate: Tue Jan 20 22:24:11 2026 -0800
[feature](routine-load) Support flexible partial update for routine load
(#59896)
Add support for `unique_key_update_mode` property in routine load to
enable flexible partial columns update. This allows different rows in
the same batch to update different columns, unlike fixed partial update
where all rows must update the same columns.
Changes:
- Add `unique_key_update_mode` property to CreateRoutineLoadInfo with
values: UPSERT (default), UPDATE_FIXED_COLUMNS, UPDATE_FLEXIBLE_COLUMNS
- Add validation for flexible partial update constraints (JSON format
only, no jsonpaths, no fuzzy_parse, no COLUMNS clause, no WHERE clause,
table must have skip_bitmap column enabled)
- Update RoutineLoadJob to persist and restore the update mode
- Update KafkaRoutineLoadJob to pass update mode to task info
- Support ALTER ROUTINE LOAD to change unique_key_update_mode
- Add regression tests covering basic usage and error cases
- Fix HashMap ordering issue in gsonPostProcess for backward
compatibility
- Add validation when ALTER changes mode to UPDATE_FLEXIBLE_COLUMNS
- Add comprehensive ALTER test cases for flexible partial update
validation
---
.../java/org/apache/doris/catalog/OlapTable.java | 27 +
.../load/routineload/KafkaRoutineLoadJob.java | 6 +-
.../doris/load/routineload/RoutineLoadJob.java | 118 +-
.../nereids/load/NereidsRoutineLoadTaskInfo.java | 8 +-
.../nereids/load/NereidsStreamLoadPlanner.java | 17 +-
.../plans/commands/AlterRoutineLoadCommand.java | 8 +
.../plans/commands/info/CreateRoutineLoadInfo.java | 111 +-
.../doris/load/routineload/RoutineLoadJobTest.java | 125 ++
.../test_routine_load_flexible_partial_update.out | 82 ++
.../regression/util/RoutineLoadTestUtils.groovy | 25 +-
...est_routine_load_flexible_partial_update.groovy | 1381 ++++++++++++++++++++
...ine_load_partial_update_new_key_behavior.groovy | 2 +-
12 files changed, 1874 insertions(+), 36 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 39383ceda2c..e2b743f666c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2977,6 +2977,33 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return hasSkipBitmapColumn();
}
+ /**
+ * Validate that the table supports flexible partial update.
+ * Checks the following constraints:
+ * 1. Must be MoW unique key table
+ * 2. Must have skip_bitmap column
+ * 3. Must have light_schema_change enabled
+ * 4. Cannot have variant columns
+ * @throws UserException if any constraint is not satisfied
+ */
+ public void validateForFlexiblePartialUpdate() throws UserException {
+ if (!getEnableUniqueKeyMergeOnWrite()) {
+ throw new UserException("Flexible partial update is only supported
in unique table MoW");
+ }
+ if (!hasSkipBitmapColumn()) {
+ throw new UserException("Flexible partial update can only support
table with skip bitmap hidden column."
+ + " But table " + getName() + " doesn't have it. You can
use `ALTER TABLE " + getName()
+ + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it
to the table.");
+ }
+ if (!getEnableLightSchemaChange()) {
+ throw new UserException("Flexible partial update can only support
table with light_schema_change enabled."
+ + " But table " + getName() + "'s property
light_schema_change is false");
+ }
+ if (hasVariantColumns()) {
+ throw new UserException("Flexible partial update can only support
table without variant columns.");
+ }
+ }
+
public boolean getEnableUniqueKeyMergeOnWrite() {
if (tableProperty == null) {
return false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 82cd83cfe6e..7cd2a71d2f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -729,7 +729,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private void modifyPropertiesInternal(Map<String, String> jobProperties,
KafkaDataSourceProperties
dataSourceProperties)
- throws DdlException {
+ throws UserException {
if (null != dataSourceProperties) {
List<Pair<Integer, Long>> kafkaPartitionOffsets =
Lists.newArrayList();
Map<String, String> customKafkaProperties = Maps.newHashMap();
@@ -830,7 +830,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(),
(KafkaDataSourceProperties) log.getDataSourceProperties());
- } catch (DdlException e) {
+ } catch (UserException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}",
id, e);
}
@@ -974,6 +974,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return new NereidsRoutineLoadTaskInfo(execMemLimit, new
HashMap<>(jobProperties), maxBatchIntervalS,
partitionNamesInfo, mergeType, deleteCondition, sequenceCol,
maxFilterRatio, importColumnDescs,
precedingFilter, whereExpr, columnSeparator, lineDelimiter,
enclose, escape, sendBatchParallelism,
- loadToSingleTablet, isPartialUpdate,
partialUpdateNewKeyPolicy, memtableOnSinkNode);
+ loadToSingleTablet, uniqueKeyUpdateMode,
partialUpdateNewKeyPolicy, memtableOnSinkNode);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 00a55b8729d..f036c3ba192 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -71,6 +71,7 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
@@ -225,6 +226,7 @@ public abstract class RoutineLoadJob
protected boolean isPartialUpdate = false;
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
+ protected TUniqueKeyUpdateMode uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPSERT;
protected String sequenceCol;
@@ -387,11 +389,14 @@ public abstract class RoutineLoadJob
jobProperties.put(info.STRICT_MODE,
String.valueOf(info.isStrictMode()));
jobProperties.put(info.SEND_BATCH_PARALLELISM,
String.valueOf(this.sendBatchParallelism));
jobProperties.put(info.LOAD_TO_SINGLE_TABLET,
String.valueOf(this.loadToSingleTablet));
- jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ?
"true" : "false");
- if (info.isPartialUpdate()) {
- this.isPartialUpdate = true;
+ // Set unique key update mode
+ this.uniqueKeyUpdateMode = info.getUniqueKeyUpdateMode();
+ this.isPartialUpdate = (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+ jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
uniqueKeyUpdateMode.name());
+ jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS,
isPartialUpdate ? "true" : "false");
+ if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) {
this.partialUpdateNewKeyPolicy =
info.getPartialUpdateNewKeyPolicy();
- jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
+
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
this.partialUpdateNewKeyPolicy ==
TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
}
jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
@@ -570,6 +575,10 @@ public abstract class RoutineLoadJob
return mergeType;
}
+ public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+ return uniqueKeyUpdateMode;
+ }
+
@Override
public Expr getDeleteCondition() {
return deleteCondition;
@@ -1059,7 +1068,8 @@ public abstract class RoutineLoadJob
throw new UserException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
- if (isPartialUpdate) {
+ if (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
+ || uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}
@@ -1932,9 +1942,29 @@ public abstract class RoutineLoadJob
if (tableId == 0) {
isMultiTable = true;
}
+ // Process UNIQUE_KEY_UPDATE_MODE first to ensure correct backward
compatibility
+ // with PARTIAL_COLUMNS (HashMap iteration order is not guaranteed)
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+ String modeValue =
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+ TUniqueKeyUpdateMode mode =
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue);
+ if (mode != null) {
+ uniqueKeyUpdateMode = mode;
+ isPartialUpdate = (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+ } else {
+ uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+ }
+ }
+ // Process remaining properties
jobProperties.forEach((k, v) -> {
if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
- isPartialUpdate = Boolean.parseBoolean(v);
+ // Backward compatibility: only use partial_columns if
unique_key_update_mode is not set
+ // unique_key_update_mode takes precedence
+ if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+ isPartialUpdate = Boolean.parseBoolean(v);
+ if (isPartialUpdate) {
+ uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ }
} else if
(k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
if ("ERROR".equalsIgnoreCase(v)) {
partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.ERROR;
@@ -1979,7 +2009,7 @@ public abstract class RoutineLoadJob
public abstract NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo()
throws UserException;
// for ALTER ROUTINE LOAD
- protected void modifyCommonJobProperties(Map<String, String>
jobProperties) {
+ protected void modifyCommonJobProperties(Map<String, String>
jobProperties) throws UserException {
if
(jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY))
{
this.desireTaskConcurrentNum = Integer.parseInt(
jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
@@ -2010,5 +2040,79 @@ public abstract class RoutineLoadJob
this.maxBatchSizeBytes = Long.parseLong(
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
}
+
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+ String modeStr =
jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+ TUniqueKeyUpdateMode newMode =
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
+ // Validate flexible partial update constraints when changing to
UPDATE_FLEXIBLE_COLUMNS
+ if (newMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+ validateFlexiblePartialUpdateForAlter();
+ }
+ this.uniqueKeyUpdateMode = newMode;
+ this.isPartialUpdate = (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
uniqueKeyUpdateMode.name());
+ this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
+ }
+
+ if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
+ // Backward compatibility: only use partial_columns if
unique_key_update_mode is UPSERT (not explicitly set)
+ // unique_key_update_mode takes precedence
+ this.isPartialUpdate = Boolean.parseBoolean(
+
jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
+ if (this.isPartialUpdate && uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPSERT) {
+ this.uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
+
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
uniqueKeyUpdateMode.name());
+ }
+ }
+
+ /**
+ * Validate flexible partial update constraints when altering routine load
job.
+ */
+ private void validateFlexiblePartialUpdateForAlter() throws UserException {
+ // Multi-table load does not support flexible partial update
+ if (isMultiTable) {
+ throw new DdlException("Flexible partial update is not supported
in multi-table load");
+ }
+
+ // Get the table to check table-level constraints
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ throw new DdlException("Database not found: " + dbId);
+ }
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ throw new DdlException("Table not found: " + tableId);
+ }
+ if (!(table instanceof OlapTable)) {
+ throw new DdlException("Flexible partial update is only supported
for OLAP tables");
+ }
+ OlapTable olapTable = (OlapTable) table;
+
+ // Validate table-level constraints (MoW, skip_bitmap,
light_schema_change, variant columns)
+ olapTable.validateForFlexiblePartialUpdate();
+
+ // Routine load specific validations
+ // Must use JSON format
+ String format =
this.jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+ if (!"json".equalsIgnoreCase(format)) {
+ throw new DdlException("Flexible partial update only supports JSON
format, but current job uses: "
+ + format);
+ }
+ // Cannot use fuzzy_parse
+ if (Boolean.parseBoolean(this.jobProperties.getOrDefault(
+ JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) {
+ throw new DdlException("Flexible partial update does not support
fuzzy_parse");
+ }
+ // Cannot use jsonpaths
+ String jsonPaths = getJsonPaths();
+ if (jsonPaths != null && !jsonPaths.isEmpty()) {
+ throw new DdlException("Flexible partial update does not support
jsonpaths");
+ }
+ // Cannot specify COLUMNS mapping
+ if (columnDescs != null && !columnDescs.descs.isEmpty()) {
+ throw new DdlException("Flexible partial update does not support
COLUMNS specification");
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index 17a4d0a08e4..7f8ff5b029e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -80,8 +80,8 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
String sequenceCol, double maxFilterRatio,
NereidsImportColumnDescs columnDescs,
Expression precedingFilter, Expression whereExpr, Separator
columnSeparator,
Separator lineDelimiter, byte enclose, byte escape, int
sendBatchParallelism,
- boolean loadToSingleTablet, boolean isPartialUpdate,
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
- boolean memtableOnSinkNode) {
+ boolean loadToSingleTablet, TUniqueKeyUpdateMode
uniqueKeyUpdateMode,
+ TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, boolean
memtableOnSinkNode) {
this.execMemLimit = execMemLimit;
this.jobProperties = jobProperties;
this.maxBatchIntervalS = maxBatchIntervalS;
@@ -99,9 +99,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
this.escape = escape;
this.sendBatchParallelism = sendBatchParallelism;
this.loadToSingleTablet = loadToSingleTablet;
- if (isPartialUpdate) {
- this.uniquekeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
- }
+ this.uniquekeyUpdateMode = uniqueKeyUpdateMode;
this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
this.memtableOnSinkNode = memtableOnSinkNode;
this.timeoutSec = calTimeoutSec();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index abb03414b07..d4cc7c01afe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -145,20 +145,9 @@ public class NereidsStreamLoadPlanner {
}
}
- if (uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS &&
!destTable.hasSkipBitmapColumn()) {
- String tblName = destTable.getName();
- throw new UserException("Flexible partial update can only support
table with skip bitmap hidden column."
- + " But table " + tblName + " doesn't have it. You can use
`ALTER TABLE " + tblName
- + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it
to the table.");
- }
- if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
- && !destTable.getEnableLightSchemaChange()) {
- throw new UserException("Flexible partial update can only support
table with light_schema_change enabled."
- + " But table " + destTable.getName() + "'s property
light_schema_change is false");
- }
- if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
- && destTable.hasVariantColumns()) {
- throw new UserException("Flexible partial update can only support
table without variant columns.");
+ if (uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+ // Validate table-level constraints for flexible partial update
+ destTable.validateForFlexiblePartialUpdate();
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
index 1512fd32611..367480c5d93 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
@@ -70,6 +70,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.add(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)
.add(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)
.add(CreateRoutineLoadInfo.PARTIAL_COLUMNS)
+ .add(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)
.add(CreateRoutineLoadInfo.STRICT_MODE)
.add(CreateRoutineLoadInfo.TIMEZONE)
.add(CreateRoutineLoadInfo.WORKLOAD_GROUP)
@@ -279,6 +280,13 @@ public class AlterRoutineLoadCommand extends AlterCommand {
String.valueOf(isPartialUpdate));
}
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+ String modeStr =
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+ // Validate the mode string
+ CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
+
analyzedJobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
modeStr.toUpperCase());
+ }
+
if (jobProperties.containsKey(CreateRoutineLoadInfo.WORKLOAD_GROUP)) {
String workloadGroup =
jobProperties.get(CreateRoutineLoadInfo.WORKLOAD_GROUP);
if (!StringUtil.isEmpty(workloadGroup)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 5f23ff42711..4a32cb3d4e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -57,6 +57,7 @@ import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -90,6 +91,7 @@ public class CreateRoutineLoadInfo {
public static final String PARTIAL_COLUMNS = "partial_columns";
public static final String PARTIAL_UPDATE_NEW_KEY_POLICY =
"partial_update_new_key_behavior";
+ public static final String UNIQUE_KEY_UPDATE_MODE =
"unique_key_update_mode";
public static final String WORKLOAD_GROUP = "workload_group";
public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
@@ -125,6 +127,7 @@ public class CreateRoutineLoadInfo {
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(PARTIAL_UPDATE_NEW_KEY_POLICY)
+ .add(UNIQUE_KEY_UPDATE_MODE)
.add(WORKLOAD_GROUP)
.add(FileFormatProperties.PROP_FORMAT)
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
@@ -170,6 +173,7 @@ public class CreateRoutineLoadInfo {
*/
private boolean isPartialUpdate = false;
private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
+ private TUniqueKeyUpdateMode uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPSERT;
private String comment = "";
@@ -198,8 +202,27 @@ public class CreateRoutineLoadInfo {
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(typeName, dataSourceProperties,
this.isMultiTable);
this.mergeType = mergeType;
- this.isPartialUpdate =
this.jobProperties.getOrDefault(PARTIAL_COLUMNS,
"false").equalsIgnoreCase("true");
- if (this.isPartialUpdate &&
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
+ // Parse unique_key_update_mode first (takes precedence)
+ if (this.jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+ String modeStr = this.jobProperties.get(UNIQUE_KEY_UPDATE_MODE);
+ TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr);
+ if (mode != null) {
+ this.uniqueKeyUpdateMode = mode;
+ if (mode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
+ this.isPartialUpdate = true;
+ }
+ }
+ // validation will be done in checkJobProperties() if mode is null
+ } else {
+ // Backward compatibility: partial_columns=true maps to
UPDATE_FIXED_COLUMNS
+ this.isPartialUpdate =
this.jobProperties.getOrDefault(PARTIAL_COLUMNS,
"false").equalsIgnoreCase("true");
+ if (this.isPartialUpdate) {
+ this.uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ }
+ // Parse partial_update_new_key_behavior
+ if ((this.isPartialUpdate || this.uniqueKeyUpdateMode !=
TUniqueKeyUpdateMode.UPSERT)
+ &&
this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policyStr =
this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
if ("APPEND".equals(policyStr)) {
this.partialUpdateNewKeyPolicy =
TPartialUpdateNewRowPolicy.APPEND;
@@ -213,6 +236,40 @@ public class CreateRoutineLoadInfo {
}
}
+ /**
+ * Parse unique_key_update_mode string to TUniqueKeyUpdateMode enum.
+ * Returns null if the mode string is invalid.
+ */
+ public static TUniqueKeyUpdateMode parseUniqueKeyUpdateMode(String
modeStr) {
+ if (modeStr == null) {
+ return null;
+ }
+ switch (modeStr.toUpperCase()) {
+ case "UPSERT":
+ return TUniqueKeyUpdateMode.UPSERT;
+ case "UPDATE_FIXED_COLUMNS":
+ return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ case "UPDATE_FLEXIBLE_COLUMNS":
+ return TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Validate unique_key_update_mode string value.
+ * @throws AnalysisException if the mode string is invalid
+ */
+ public static TUniqueKeyUpdateMode
parseAndValidateUniqueKeyUpdateMode(String modeStr) throws AnalysisException {
+ TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr);
+ if (mode == null) {
+ throw new AnalysisException(UNIQUE_KEY_UPDATE_MODE
+ + " should be one of {'UPSERT', 'UPDATE_FIXED_COLUMNS',
'UPDATE_FLEXIBLE_COLUMNS'}, but found "
+ + modeStr);
+ }
+ return mode;
+ }
+
public String getName() {
return name;
}
@@ -293,6 +350,10 @@ public class CreateRoutineLoadInfo {
return partialUpdateNewKeyPolicy;
}
+ public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
+ return uniqueKeyUpdateMode;
+ }
+
public String getComment() {
return comment;
}
@@ -357,7 +418,7 @@ public class CreateRoutineLoadInfo {
name = labelNameInfo.getLabel();
Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
- if (isPartialUpdate && isMultiTable) {
+ if ((isPartialUpdate || uniqueKeyUpdateMode !=
TUniqueKeyUpdateMode.UPSERT) && isMultiTable) {
throw new AnalysisException("Partial update is not supported in
multi-table load.");
}
if (isMultiTable) {
@@ -379,6 +440,39 @@ public class CreateRoutineLoadInfo {
if (isPartialUpdate && !((OlapTable)
table).getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is only
supported in unique table MoW");
}
+ // Validate flexible partial update constraints
+ if (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
+ validateFlexiblePartialUpdate((OlapTable) table);
+ }
+ }
+
+ private void validateFlexiblePartialUpdate(OlapTable table) throws
AnalysisException {
+ // Validate table-level constraints (MoW, skip_bitmap,
light_schema_change, variant columns)
+ try {
+ table.validateForFlexiblePartialUpdate();
+ } catch (UserException e) {
+ throw new AnalysisException(e.getMessage(), e);
+ }
+ // Routine load specific validations
+ // Must use JSON format
+ String format =
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+ if (!"json".equalsIgnoreCase(format)) {
+ throw new AnalysisException("Flexible partial update only supports
JSON format, but found: " + format);
+ }
+ // Cannot use fuzzy_parse
+ if
(Boolean.parseBoolean(jobProperties.getOrDefault(JsonFileFormatProperties.PROP_FUZZY_PARSE,
"false"))) {
+ throw new AnalysisException("Flexible partial update does not
support fuzzy_parse");
+ }
+ // Cannot use jsonpaths
+ String jsonPaths =
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
+ if (jsonPaths != null && !jsonPaths.isEmpty()) {
+ throw new AnalysisException("Flexible partial update does not
support jsonpaths");
+ }
+ // Cannot specify COLUMNS mapping
+ if (loadPropertyMap != null && loadPropertyMap.values().stream()
+ .anyMatch(p -> p instanceof LoadColumnClause)) {
+ throw new AnalysisException("Flexible partial update does not
support COLUMNS specification");
+ }
}
/**
@@ -532,11 +626,18 @@ public class CreateRoutineLoadInfo {
}
timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE,
timezone));
+ // check unique_key_update_mode
+ // Note: unique_key_update_mode takes precedence over partial_columns
for backward compatibility
+ if (jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) {
+ String modeStr = jobProperties.get(UNIQUE_KEY_UPDATE_MODE);
+ parseAndValidateUniqueKeyUpdateMode(modeStr);
+ }
+
// check partial_update_new_key_behavior
if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
- if (!isPartialUpdate) {
+ if (!isPartialUpdate && uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPSERT) {
throw new AnalysisException(
- PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when
partial_columns is true");
+ PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when
partial update is enabled");
}
String policy =
jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8b02ad7076a..d2246406428 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -28,7 +28,9 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.kafka.KafkaUtil;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
+import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
@@ -368,4 +370,127 @@ public class RoutineLoadJobTest {
Assert.assertEquals(expect, showCreateInfo);
}
+ @Test
+ public void testParseUniqueKeyUpdateMode() {
+ // Test valid mode strings
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+ CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPSERT"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+ CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("upsert"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("update_fixed_columns"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("Update_Flexible_Columns"));
+
+ // Test invalid mode strings
+
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(null));
+
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("INVALID"));
+ Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(""));
+
Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("PARTIAL_UPDATE"));
+ }
+
+ @Test
+ public void testParseAndValidateUniqueKeyUpdateMode() throws Exception {
+ // Test valid mode strings
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT,
+
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPSERT"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
+
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS"));
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
+
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS"));
+
+ // Test invalid mode string throws exception
+ try {
+
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("INVALID_MODE");
+ Assert.fail("Expected AnalysisException");
+ } catch (Exception e) {
+
Assert.assertTrue(e.getMessage().contains("unique_key_update_mode"));
+ Assert.assertTrue(e.getMessage().contains("INVALID_MODE"));
+ }
+ }
+
+ @Test
+ public void testUniqueKeyUpdateModeInJobProperties() {
+ // Test that uniqueKeyUpdateMode is properly stored in jobProperties
+ KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+ Map<String, String> jobProperties = Maps.newHashMap();
+ jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
"UPDATE_FLEXIBLE_COLUMNS");
+ Deencapsulation.setField(job, "jobProperties", jobProperties);
+ Deencapsulation.setField(job, "uniqueKeyUpdateMode",
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS);
+
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
job.getUniqueKeyUpdateMode());
+ }
+
+ @Test
+ public void testBackwardCompatibilityPartialColumnsToUniqueKeyUpdateMode()
throws Exception {
+ // Test backward compatibility: partial_columns=true should map to
UPDATE_FIXED_COLUMNS
+ // This tests the logic in gsonPostProcess without calling the full
method
+ KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+ Map<String, String> jobProperties = Maps.newHashMap();
+ jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true");
+ // Note: UNIQUE_KEY_UPDATE_MODE is NOT set - testing backward
compatibility
+ Deencapsulation.setField(job, "jobProperties", jobProperties);
+ Deencapsulation.setField(job, "uniqueKeyUpdateMode",
TUniqueKeyUpdateMode.UPSERT);
+
+ // Simulate the backward compatibility logic from gsonPostProcess
+ TUniqueKeyUpdateMode uniqueKeyUpdateMode =
Deencapsulation.getField(job, "uniqueKeyUpdateMode");
+ boolean isPartialUpdate = false;
+
+ // Process PARTIAL_COLUMNS when UNIQUE_KEY_UPDATE_MODE is not set
+ if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+ String partialColumnsValue =
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS);
+ isPartialUpdate = Boolean.parseBoolean(partialColumnsValue);
+ if (isPartialUpdate) {
+ uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ }
+
+ // Verify the backward compatibility logic
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS,
uniqueKeyUpdateMode);
+ Assert.assertTrue(isPartialUpdate);
+ }
+
+ @Test
+ public void testUniqueKeyUpdateModeTakesPrecedenceOverPartialColumns()
throws Exception {
+ // Test that unique_key_update_mode takes precedence over
partial_columns
+ // This tests the logic in gsonPostProcess without calling the full
method
+ KafkaRoutineLoadJob job = new KafkaRoutineLoadJob();
+ Map<String, String> jobProperties = Maps.newHashMap();
+ jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE,
"UPDATE_FLEXIBLE_COLUMNS");
+ jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true");
+ Deencapsulation.setField(job, "jobProperties", jobProperties);
+
+ // Simulate the precedence logic from gsonPostProcess
+ TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
+ boolean isPartialUpdate = false;
+
+ // Process UNIQUE_KEY_UPDATE_MODE first (takes precedence)
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
+ String modeValue =
jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
+ TUniqueKeyUpdateMode mode =
CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue);
+ if (mode != null) {
+ uniqueKeyUpdateMode = mode;
+ isPartialUpdate = (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
+ }
+ }
+
+ // Process PARTIAL_COLUMNS only if UNIQUE_KEY_UPDATE_MODE results in
UPSERT
+ if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
+ String partialColumnsValue =
jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS);
+ isPartialUpdate = Boolean.parseBoolean(partialColumnsValue);
+ if (isPartialUpdate) {
+ uniqueKeyUpdateMode =
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
+ }
+ }
+
+ // unique_key_update_mode should take precedence
+ Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS,
uniqueKeyUpdateMode);
+ // isPartialUpdate should be false for UPDATE_FLEXIBLE_COLUMNS
+ Assert.assertFalse(isPartialUpdate);
+ }
+
}
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
new file mode 100644
index 00000000000..0d4f162d308
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
@@ -0,0 +1,82 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_initial1 --
+1 alice 100 20
+2 bob 90 21
+3 charlie 80 22
+4 david 70 23
+5 eve 60 24
+
+-- !select_after_flex_update1 --
+1 alice 150 20
+2 bob 90 30
+3 chuck 95 22
+4 david 70 23
+5 eve 60 24
+6 frank \N \N
+
+-- !select_initial2 --
+1 10 20 30 40
+2 100 200 300 400
+3 1000 2000 3000 4000
+
+-- !select_after_flex_update2 --
+1 11 20 30 40
+2 100 222 333 400
+3 1000 2000 3000 4000
+4 \N 9876 4444 1234
+
+-- !select_initial7 --
+1 alice 100 20
+2 bob 90 21
+3 charlie 80 22
+
+-- !select_after_flex_where --
+1 alice 100 20
+2 bob 95 21
+3 chuck 80 22
+4 diana 70 \N
+
+-- !select_initial11 --
+1 alice 100 20
+2 bob 90 21
+3 charlie 80 22
+
+-- !select_after_fixed_update --
+1 alice 150 20
+2 bob 95 21
+3 charlie 80 22
+4 \N 85 \N
+
+-- !select_initial12 --
+1 alice 100 20
+2 bob 90 21
+3 charlie 80 22
+
+-- !select_after_alter_flex --
+1 alice 200 20
+2 bob 90 35
+3 charlie 80 22
+4 diana \N \N
+
+-- !select_initial18 --
+1 alice 100 20
+2 bob 90 21
+
+-- !select_after_alter_flex_where --
+1 alice 100 20
+2 bob 95 21
+3 charlie 80 \N
+
+-- !select_initial20 --
+1 alice 100 20
+2 bob 90 21
+
+-- !select_after_alter_upsert --
+1 \N 200 \N
+2 bob 90 21
+3 charlie 80 22
+
+-- !select_initial21 --
+1 alice 100 20
+2 bob 90 21
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index f3f6641d3f6..c1bd321607b 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -107,6 +107,18 @@ class RoutineLoadTestUtils {
}
static int waitForTaskFinish(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ return waitForTaskFinishInternal(sqlRunner, job, tableName,
expectedMinRows, maxAttempts, false)
+ }
+
+ /**
+ * Wait for routine load task to finish for MOW (Merge-on-Write) unique
key tables.
+ * Uses skip_delete_bitmap=true to properly count rows during partial
update operations.
+ */
+ static int waitForTaskFinishMoW(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ return waitForTaskFinishInternal(sqlRunner, job, tableName,
expectedMinRows, maxAttempts, true)
+ }
+
+ private static int waitForTaskFinishInternal(Closure sqlRunner, String
job, String tableName, int expectedMinRows, int maxAttempts, boolean isMoW) {
def count = 0
while (true) {
def res = sqlRunner.call("show routine load for ${job}")
@@ -114,7 +126,18 @@ class RoutineLoadTestUtils {
def statistic = res[0][14].toString()
logger.info("Routine load state: ${routineLoadState}")
logger.info("Routine load statistic: ${statistic}")
- def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ def rowCount
+ if (isMoW) {
+ // For MOW tables, use skip_delete_bitmap to properly count
rows
+ sqlRunner.call("set skip_delete_bitmap=true")
+ sqlRunner.call("set skip_delete_sign=true")
+ sqlRunner.call("sync")
+ rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ sqlRunner.call("set skip_delete_bitmap=false")
+ sqlRunner.call("set skip_delete_sign=false")
+ } else {
+ rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ }
if (routineLoadState == "RUNNING" && rowCount[0][0] >
expectedMinRows) {
break
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
new file mode 100644
index 00000000000..510eeb0ecab
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -0,0 +1,1381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_flexible_partial_update", "nonConcurrent") {
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ // Test 1: Basic flexible partial update
+ def kafkaJsonTopic1 = "test_routine_load_flexible_partial_update_basic"
+ def tableName1 = "test_routine_load_flex_update_basic"
+ def job1 = "test_flex_partial_update_job_basic"
+
+ sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'test flexible partial update'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // verify skip bitmap column is enabled
+ def show_res = sql "show create table ${tableName1}"
+
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column"
= "true"'))
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName1} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21),
+ (3, 'charlie', 80, 22),
+ (4, 'david', 70, 23),
+ (5, 'eve', 60, 24)
+ """
+
+ qt_select_initial1 "SELECT id, name, score, age FROM ${tableName1}
ORDER BY id"
+
+ try {
+ // create routine load with flexible partial update
+ sql """
+ CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic1}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send JSON data with different columns per row
+ // Row 1: update only score for id=1
+ // Row 2: update only age for id=2
+ // Row 3: update both name and score for id=3
+ // Row 4: insert new row with only id and name
+ def data = [
+ '{"id": 1, "score": 150}',
+ '{"id": 2, "age": 30}',
+ '{"id": 3, "name": "chuck", "score": 95}',
+ '{"id": 6, "name": "frank"}'
+ ]
+
+ data.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic1, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // wait for routine load task to finish
+ // With skip_delete_bitmap=true, count = initial + kafka_messages
= 5 + 4 = 9
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1,
tableName1, 8)
+
+ // verify flexible partial update results
+ qt_select_after_flex_update1 "SELECT id, name, score, age FROM
${tableName1} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job1}"
+ }
+
+ // Test 2: Flexible partial update with default values
+ def kafkaJsonTopic2 =
"test_routine_load_flexible_partial_update_default"
+ def tableName2 = "test_routine_load_flex_update_default"
+ def job2 = "test_flex_partial_update_job_default"
+
+ sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ `id` int NOT NULL,
+ `v1` bigint NULL,
+ `v2` bigint NULL DEFAULT "9876",
+ `v3` bigint NOT NULL,
+ `v4` bigint NOT NULL DEFAULT "1234"
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'test flexible partial update with default values'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName2} VALUES
+ (1, 10, 20, 30, 40),
+ (2, 100, 200, 300, 400),
+ (3, 1000, 2000, 3000, 4000)
+ """
+
+ qt_select_initial2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER
BY id"
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic2}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send JSON data with different columns per row
+ def data2 = [
+ '{"id": 1, "v1": 11}',
+ '{"id": 2, "v2": 222, "v3": 333}',
+ '{"id": 4, "v3": 4444}'
+ ]
+
+ data2.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic2, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true, count = initial + kafka_messages
= 3 + 3 = 6
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2,
tableName2, 5)
+
+ qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM
${tableName2} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job2}"
+ }
+
+ // Test 3: Error case - CSV format not supported
+ def kafkaCsvTopic3 =
"test_routine_load_flexible_partial_update_csv_error"
+ def tableName3 = "test_routine_load_flex_update_csv_error"
+ def job3 = "test_flex_partial_update_job_csv_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName3} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job3} ON ${tableName3}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic3}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update only supports JSON format"
+ }
+
+ // Test 4: Error case - jsonpaths not supported with flexible partial
update
+ def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
+ def job4 = "test_flex_partial_update_job_jsonpaths_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName4} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job4} ON ${tableName4}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]',
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "test_topic",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update does not support jsonpaths"
+ }
+
+ // Test 5: Error case - fuzzy_parse not supported
+ def kafkaJsonTopic5 =
"test_routine_load_flexible_partial_update_fuzzy_error"
+ def tableName5 = "test_routine_load_flex_update_fuzzy_error"
+ def job5 = "test_flex_partial_update_job_fuzzy_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName5} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName5} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job5} ON ${tableName5}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "fuzzy_parse" = "true",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic5}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update does not support fuzzy_parse"
+ }
+
+ // Test 6: Error case - COLUMNS clause not supported
+ def kafkaJsonTopic6 =
"test_routine_load_flexible_partial_update_columns_error"
+ def tableName6 = "test_routine_load_flex_update_columns_error"
+ def job6 = "test_flex_partial_update_job_columns_error"
+
+ sql """ DROP TABLE IF EXISTS ${tableName6} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName6} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job6} ON ${tableName6}
+ COLUMNS (id, name, score)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic6}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update does not support COLUMNS
specification"
+ }
+
+ // Test 7: Success case - WHERE clause works with flexible partial
update
+ def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where"
+ def tableName7 = "test_routine_load_flex_update_where"
+ def job7 = "test_flex_partial_update_job_where"
+
+ sql """ DROP TABLE IF EXISTS ${tableName7} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName7} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName7} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21),
+ (3, 'charlie', 80, 22)
+ """
+
+ qt_select_initial7 "SELECT id, name, score, age FROM ${tableName7}
ORDER BY id"
+
+ try {
+ // create routine load with WHERE clause and flexible partial
update
+ sql """
+ CREATE ROUTINE LOAD ${job7} ON ${tableName7}
+ WHERE id > 1
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic7}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // send JSON data - WHERE clause filters id > 1, so id=1 row
should NOT be processed
+ def data7 = [
+ '{"id": 1, "score": 999}',
+ '{"id": 2, "score": 95}',
+ '{"id": 3, "name": "chuck"}',
+ '{"id": 4, "name": "diana", "score": 70}'
+ ]
+
+ data7.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic7, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true and WHERE id > 1:
+ // - id=1: 1 version (not updated, filtered by WHERE)
+ // - id=2: 2 versions (original + partial update)
+ // - id=3: 2 versions (original + partial update)
+ // - id=4: 1 version (new row)
+ // Total: 6 rows, so expectedMinRows = 5 (waits for count > 5)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job7,
tableName7, 5)
+
+ // verify: id=1 should NOT be updated (filtered by WHERE),
id=2,3,4 should be updated
+ qt_select_after_flex_where "SELECT id, name, score, age FROM
${tableName7} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job7}"
+ }
+
+ // Test 8: Error case - table without skip_bitmap column
+ def kafkaJsonTopic8 =
"test_routine_load_flexible_partial_update_no_skip_bitmap"
+ def tableName8 = "test_routine_load_flex_update_no_skip_bitmap"
+ def job8 = "test_flex_partial_update_job_no_skip_bitmap"
+
+ sql """ DROP TABLE IF EXISTS ${tableName8} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName8} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "enable_unique_key_skip_bitmap_column" = "false"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job8} ON ${tableName8}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic8}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update can only support table with
skip bitmap hidden column"
+ }
+
+ // Test 9: Error case - table with variant column
+ def kafkaJsonTopic9 =
"test_routine_load_flexible_partial_update_variant"
+ def tableName9 = "test_routine_load_flex_update_variant"
+ def job9 = "test_flex_partial_update_job_variant"
+
+ sql """ DROP TABLE IF EXISTS ${tableName9} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName9} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `data` variant NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job9} ON ${tableName9}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic9}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "Flexible partial update can only support table without
variant columns"
+ }
+
+ // Test 10: Error case - invalid unique_key_update_mode value
+ def kafkaJsonTopic10 =
"test_routine_load_flexible_partial_update_invalid_mode"
+ def tableName10 = "test_routine_load_flex_update_invalid_mode"
+ def job10 = "test_flex_partial_update_job_invalid_mode"
+
+ sql """ DROP TABLE IF EXISTS ${tableName10} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName10} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ test {
+ sql """
+ CREATE ROUTINE LOAD ${job10} ON ${tableName10}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "INVALID_MODE"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic10}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ exception "unique_key_update_mode should be one of"
+ }
+
+ // Test 11: UPDATE_FIXED_COLUMNS mode (backward compatibility)
+ def kafkaJsonTopic11 = "test_routine_load_fixed_columns_mode"
+ def tableName11 = "test_routine_load_fixed_columns_mode"
+ def job11 = "test_fixed_columns_mode_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName11} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName11} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName11} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21),
+ (3, 'charlie', 80, 22)
+ """
+
+ qt_select_initial11 "SELECT id, name, score, age FROM ${tableName11}
ORDER BY id"
+
+ try {
+ // create routine load with UPDATE_FIXED_COLUMNS mode
+ sql """
+ CREATE ROUTINE LOAD ${job11} ON ${tableName11}
+ COLUMNS (id, score)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic11}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def data11 = [
+ '{"id": 1, "score": 150}',
+ '{"id": 2, "score": 95}',
+ '{"id": 4, "score": 85}'
+ ]
+
+ data11.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic11, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true, count = initial + kafka_messages
= 3 + 3 = 6
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11,
tableName11, 5)
+
+ qt_select_after_fixed_update "SELECT id, name, score, age FROM
${tableName11} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job11}"
+ }
+
+ // Test 12: ALTER ROUTINE LOAD to change unique_key_update_mode
+ def kafkaJsonTopic12 = "test_routine_load_alter_flex_mode"
+ def tableName12 = "test_routine_load_alter_flex_mode"
+ def job12 = "test_alter_flex_mode_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName12} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName12} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName12} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21),
+ (3, 'charlie', 80, 22)
+ """
+
+ qt_select_initial12 "SELECT id, name, score, age FROM ${tableName12}
ORDER BY id"
+
+ try {
+ // create routine load with UPSERT mode (default)
+ sql """
+ CREATE ROUTINE LOAD ${job12} ON ${tableName12}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic12}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // pause the job before altering
+ sql "PAUSE ROUTINE LOAD FOR ${job12}"
+
+ // alter to UPDATE_FLEXIBLE_COLUMNS mode
+ sql """
+ ALTER ROUTINE LOAD FOR ${job12}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+
+ // verify the property was changed
+ def res = sql "SHOW ROUTINE LOAD FOR ${job12}"
+ def jobProperties = res[0][11].toString()
+ logger.info("Altered routine load job properties:
${jobProperties}")
+ assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+ // resume the job
+ sql "RESUME ROUTINE LOAD FOR ${job12}"
+
+ // send JSON data with different columns per row
+ def data12 = [
+ '{"id": 1, "score": 200}',
+ '{"id": 2, "age": 35}',
+ '{"id": 4, "name": "diana"}'
+ ]
+
+ data12.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic12, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true, count = initial + kafka_messages
= 3 + 3 = 6
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12,
tableName12, 5)
+
+ // verify flexible partial update results after alter
+ qt_select_after_alter_flex "SELECT id, name, score, age FROM
${tableName12} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job12}"
+ }
+
+ // Test 13: ALTER ROUTINE LOAD - error when trying to change to flex
mode with invalid settings
+ def kafkaJsonTopic13 = "test_routine_load_alter_flex_error"
+ def tableName13 = "test_routine_load_alter_flex_error"
+ def job13 = "test_alter_flex_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName13} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName13} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "enable_unique_key_skip_bitmap_column" = "false"
+ );
+ """
+
+ try {
+ // create routine load with UPSERT mode (default)
+ sql """
+ CREATE ROUTINE LOAD ${job13} ON ${tableName13}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic13}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // pause the job before altering
+ sql "PAUSE ROUTINE LOAD FOR ${job13}"
+
+ // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail
because table doesn't have skip_bitmap
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job13}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update can only support table with
skip bitmap hidden column"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job13}"
+ }
+
+ // Test 14: ALTER to flex mode fails when using CSV format
+ def kafkaJsonTopic14 = "test_routine_load_alter_flex_csv_error"
+ def tableName14 = "test_routine_load_alter_flex_csv_error"
+ def job14 = "test_alter_flex_csv_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName14} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName14} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ try {
+ // create routine load with CSV format
+ sql """
+ CREATE ROUTINE LOAD ${job14} ON ${tableName14}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "csv"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic14}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job14}"
+
+ // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail
because CSV format
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job14}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update only supports JSON format"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job14}"
+ }
+
+ // Test 15: ALTER to flex mode fails when using fuzzy_parse
+ def kafkaJsonTopic15 = "test_routine_load_alter_flex_fuzzy_error"
+ def tableName15 = "test_routine_load_alter_flex_fuzzy_error"
+ def job15 = "test_alter_flex_fuzzy_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName15} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName15} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ try {
+ // create routine load with fuzzy_parse enabled
+ sql """
+ CREATE ROUTINE LOAD ${job15} ON ${tableName15}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "fuzzy_parse" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic15}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job15}"
+
+ // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail
because fuzzy_parse
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job15}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update does not support
fuzzy_parse"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job15}"
+ }
+
+ // Test 16: ALTER to flex mode fails with jsonpaths
+ def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
+ def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
+ def job16 = "test_alter_flex_jsonpaths_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName16} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ try {
+ // create routine load with jsonpaths (UPSERT mode)
+ sql """
+ CREATE ROUTINE LOAD ${job16} ON ${tableName16}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]'
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic16}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job16}"
+
+ // alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because
jsonpaths is set
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job16}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update does not support jsonpaths"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job16}"
+ }
+
+ // Test 17: ALTER to flex mode fails when COLUMNS clause is specified
+ def kafkaJsonTopic17 = "test_routine_load_alter_flex_columns_error"
+ def tableName17 = "test_routine_load_alter_flex_columns_error"
+ def job17 = "test_alter_flex_columns_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName17} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName17} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ try {
+ // create routine load with COLUMNS clause
+ sql """
+ CREATE ROUTINE LOAD ${job17} ON ${tableName17}
+ COLUMNS (id, name, score)
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic17}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job17}"
+
+ // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail
because COLUMNS clause
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job17}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update does not support COLUMNS
specification"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job17}"
+ }
+
+ // Test 18: ALTER to flex mode succeeds with WHERE clause
+ def kafkaJsonTopic18 = "test_routine_load_alter_flex_where"
+ def tableName18 = "test_routine_load_alter_flex_where"
+ def job18 = "test_alter_flex_where_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName18} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName18} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName18} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21)
+ """
+
+ qt_select_initial18 "SELECT id, name, score, age FROM ${tableName18}
ORDER BY id"
+
+ try {
+ // create routine load with WHERE clause (UPSERT mode)
+ sql """
+ CREATE ROUTINE LOAD ${job18} ON ${tableName18}
+ WHERE id > 1
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic18}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job18}"
+
+ // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
+ sql """
+ ALTER ROUTINE LOAD FOR ${job18}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+
+ // verify the property was changed
+ def res = sql "SHOW ROUTINE LOAD FOR ${job18}"
+ def jobProperties = res[0][11].toString()
+ logger.info("Altered routine load job properties:
${jobProperties}")
+ assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
+
+ sql "RESUME ROUTINE LOAD FOR ${job18}"
+
+ // send JSON data - WHERE clause filters id > 1
+ def data18 = [
+ '{"id": 1, "score": 999}',
+ '{"id": 2, "score": 95}',
+ '{"id": 3, "name": "charlie", "score": 80}'
+ ]
+
+ data18.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic18, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true and WHERE id > 1:
+ // - id=1: 1 version (not updated, filtered by WHERE)
+ // - id=2: 2 versions (original + partial update)
+ // - id=3: 1 version (new row)
+ // Total: 4 rows, so expectedMinRows = 3 (waits for count > 3)
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job18,
tableName18, 3)
+
+ // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3
should be updated
+ qt_select_after_alter_flex_where "SELECT id, name, score, age FROM
${tableName18} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job18}"
+ }
+
+ // Test 19: ALTER to flex mode fails on non-MoW table
+ def kafkaJsonTopic19 = "test_routine_load_alter_flex_non_mow_error"
+ def tableName19 = "test_routine_load_alter_flex_non_mow_error"
+ def job19 = "test_alter_flex_non_mow_error_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName19} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName19} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "false"
+ );
+ """
+
+ try {
+ // create routine load
+ sql """
+ CREATE ROUTINE LOAD ${job19} ON ${tableName19}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic19}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job19}"
+
+ // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail
because non-MoW
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job19}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update is only supported in unique
table MoW"
+ }
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job19}"
+ }
+
+ // Test 20: ALTER from flex mode to UPSERT mode (success case)
+ def kafkaJsonTopic20 = "test_routine_load_alter_flex_to_upsert"
+ def tableName20 = "test_routine_load_alter_flex_to_upsert"
+ def job20 = "test_alter_flex_to_upsert_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName20} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName20} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName20} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21)
+ """
+
+ qt_select_initial20 "SELECT id, name, score, age FROM ${tableName20}
ORDER BY id"
+
+ try {
+ // create routine load with flex mode
+ sql """
+ CREATE ROUTINE LOAD ${job20} ON ${tableName20}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic20}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job20}"
+
+ // alter to UPSERT mode
+ sql """
+ ALTER ROUTINE LOAD FOR ${job20}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPSERT"
+ );
+ """
+
+ // verify the property was changed
+ def res = sql "SHOW ROUTINE LOAD FOR ${job20}"
+ def jobProperties = res[0][11].toString()
+ logger.info("Altered routine load job properties:
${jobProperties}")
+ assertTrue(jobProperties.contains("UPSERT"))
+
+ sql "RESUME ROUTINE LOAD FOR ${job20}"
+
+ // send JSON data - with UPSERT mode, missing columns should be
NULL
+ def data20 = [
+ '{"id": 1, "score": 200}',
+ '{"id": 3, "name": "charlie", "score": 80, "age": 22}'
+ ]
+
+ data20.each { line ->
+ logger.info("Sending to Kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaJsonTopic20, null, line)
+ producer.send(record).get()
+ }
+ producer.flush()
+
+ // With skip_delete_bitmap=true, count = initial + kafka_messages
= 2 + 2 = 4
+ RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20,
tableName20, 3)
+
+ // with UPSERT, id=1 should have NULL for name and age (full row
replacement)
+ qt_select_after_alter_upsert "SELECT id, name, score, age FROM
${tableName20} ORDER BY id"
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job20}"
+ }
+
+ // Test 21: ALTER from flex mode to UPDATE_FIXED_COLUMNS mode (success
case)
+ def kafkaJsonTopic21 = "test_routine_load_alter_flex_to_fixed"
+ def tableName21 = "test_routine_load_alter_flex_to_fixed"
+ def job21 = "test_alter_flex_to_fixed_job"
+
+ sql """ DROP TABLE IF EXISTS ${tableName21} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName21} (
+ `id` int NOT NULL,
+ `name` varchar(65533) NULL,
+ `score` int NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "enable_unique_key_skip_bitmap_column" = "true"
+ );
+ """
+
+ // insert initial data
+ sql """
+ INSERT INTO ${tableName21} VALUES
+ (1, 'alice', 100, 20),
+ (2, 'bob', 90, 21)
+ """
+
+ qt_select_initial21 "SELECT id, name, score, age FROM ${tableName21}
ORDER BY id"
+
+ try {
+ // create routine load with flex mode
+ sql """
+ CREATE ROUTINE LOAD ${job21} ON ${tableName21}
+ PROPERTIES
+ (
+ "max_batch_interval" = "10",
+ "format" = "json",
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaJsonTopic21}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "PAUSE ROUTINE LOAD FOR ${job21}"
+
+ // alter to UPDATE_FIXED_COLUMNS mode - need to add COLUMNS clause
via ALTER
+ // Note: This changes from flexible to fixed partial update
+ sql """
+ ALTER ROUTINE LOAD FOR ${job21}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS"
+ );
+ """
+
+ // verify the property was changed
+ def res = sql "SHOW ROUTINE LOAD FOR ${job21}"
+ def jobProperties = res[0][11].toString()
+ logger.info("Altered routine load job properties:
${jobProperties}")
+ assertTrue(jobProperties.contains("UPDATE_FIXED_COLUMNS"))
+
+ } catch (Exception e) {
+ logger.error("Error during test: " + e.getMessage())
+ throw e
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${job21}"
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
index a6a97253e98..eab01074ff3 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy
@@ -276,7 +276,7 @@ suite("test_routine_load_partial_update_new_key_behavior",
"nonConcurrent") {
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
- exception "partial_update_new_key_behavior can only be set when
partial_columns is true"
+ exception "partial_update_new_key_behavior can only be set when
partial update is enabled"
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]