This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch routineload_flexible_update
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/routineload_flexible_update by
this push:
new 3d2a33a425f fix: Disable jsonpaths with flexible partial update in
routine load
3d2a33a425f is described below
commit 3d2a33a425f2643b28688d8a4f66dfae21fafb3d
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Jan 15 21:12:48 2026 -0800
fix: Disable jsonpaths with flexible partial update in routine load
Flexible partial update cannot work correctly with jsonpaths because
jsonpaths extracts values by position without knowing which columns
should be preserved. This change adds validation to reject jsonpaths
when flexible partial update is enabled.
Changes:
- Add jsonpaths validation in CreateRoutineLoadInfo for CREATE path
- Add jsonpaths validation in RoutineLoadJob for ALTER path
- Convert jsonpaths success tests to error tests
- Update expected test outputs
---
.../doris/load/routineload/RoutineLoadJob.java | 5 +
.../plans/commands/info/CreateRoutineLoadInfo.java | 5 +
.../test_routine_load_flexible_partial_update.out | 20 ----
...est_routine_load_flexible_partial_update.groovy | 113 ++++-----------------
4 files changed, 32 insertions(+), 111 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index c1aa2971d2e..585c3167154 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -2102,6 +2102,11 @@ public abstract class RoutineLoadJob
JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) {
throw new DdlException("Flexible partial update does not support
fuzzy_parse");
}
+ // Cannot use jsonpaths
+ String jsonPaths = getJsonPaths();
+ if (jsonPaths != null && !jsonPaths.isEmpty()) {
+ throw new DdlException("Flexible partial update does not support
jsonpaths");
+ }
// Cannot specify COLUMNS mapping
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
throw new DdlException("Flexible partial update does not support
COLUMNS specification");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 972649648cb..9394257013c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -463,6 +463,11 @@ public class CreateRoutineLoadInfo {
if
(Boolean.parseBoolean(jobProperties.getOrDefault(JsonFileFormatProperties.PROP_FUZZY_PARSE,
"false"))) {
throw new AnalysisException("Flexible partial update does not
support fuzzy_parse");
}
+ // Cannot use jsonpaths
+ String jsonPaths =
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
+ if (jsonPaths != null && !jsonPaths.isEmpty()) {
+ throw new AnalysisException("Flexible partial update does not
support jsonpaths");
+ }
// Cannot specify COLUMNS mapping
if (loadPropertyMap != null && loadPropertyMap.values().stream()
.anyMatch(p -> p instanceof LoadColumnClause)) {
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
index fdcd03a5560..0d4f162d308 100644
---
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
@@ -25,17 +25,6 @@
3 1000 2000 3000 4000
4 \N 9876 4444 1234
--- !select_initial4 --
-1 alice 100 20
-2 bob 90 21
-3 charlie 80 22
-
--- !select_after_flex_jsonpaths --
-1 alice_updated 150 20
-2 bob_updated 95 21
-3 charlie 80 22
-4 diana 70 \N
-
-- !select_initial7 --
1 alice 100 20
2 bob 90 21
@@ -69,15 +58,6 @@
3 charlie 80 22
4 diana \N \N
--- !select_initial16 --
-1 alice 100 20
-2 bob 90 21
-
--- !select_after_alter_flex_jsonpaths --
-1 alice_updated 150 20
-2 bob 90 21
-3 charlie 80 \N
-
-- !select_initial18 --
1 alice 100 20
2 bob 90 21
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
index 7eef79bb98b..12758ab366e 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -232,18 +232,16 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
exception "Flexible partial update only supports JSON format"
}
- // Test 4: Success case - jsonpaths works with flexible partial update
- def kafkaJsonTopic4 =
"test_routine_load_flexible_partial_update_jsonpaths"
- def tableName4 = "test_routine_load_flex_update_jsonpaths"
- def job4 = "test_flex_partial_update_job_jsonpaths"
+ // Test 4: Error case - jsonpaths not supported with flexible partial
update
+ def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
+ def job4 = "test_flex_partial_update_job_jsonpaths_error"
sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
sql """
CREATE TABLE IF NOT EXISTS ${tableName4} (
`id` int NOT NULL,
`name` varchar(65533) NULL,
- `score` int NULL,
- `age` int NULL
+ `score` int NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -255,18 +253,7 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
);
"""
- // insert initial data
- sql """
- INSERT INTO ${tableName4} VALUES
- (1, 'alice', 100, 20),
- (2, 'bob', 90, 21),
- (3, 'charlie', 80, 22)
- """
-
- qt_select_initial4 "SELECT id, name, score, age FROM ${tableName4}
ORDER BY id"
-
- try {
- // create routine load with jsonpaths and flexible partial update
+ test {
sql """
CREATE ROUTINE LOAD ${job4} ON ${tableName4}
PROPERTIES
@@ -279,34 +266,11 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
FROM KAFKA
(
"kafka_broker_list" = "${kafka_broker}",
- "kafka_topic" = "${kafkaJsonTopic4}",
+ "kafka_topic" = "test_topic",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
-
- // send JSON data - jsonpaths extracts id, name, score (age is not
in jsonpaths so remains unchanged)
- def data4 = [
- '{"id": 1, "name": "alice_updated", "score": 150}',
- '{"id": 2, "name": "bob_updated", "score": 95}',
- '{"id": 4, "name": "diana", "score": 70}'
- ]
-
- data4.each { line ->
- logger.info("Sending to Kafka: ${line}")
- def record = new ProducerRecord<>(kafkaJsonTopic4, null, line)
- producer.send(record).get()
- }
- producer.flush()
-
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job4, tableName4, 3)
-
- // verify flexible partial update with jsonpaths works
- qt_select_after_flex_jsonpaths "SELECT id, name, score, age FROM
${tableName4} ORDER BY id"
- } catch (Exception e) {
- logger.error("Error during test: " + e.getMessage())
- throw e
- } finally {
- sql "STOP ROUTINE LOAD FOR ${job4}"
+ exception "Flexible partial update does not support jsonpaths"
}
// Test 5: Error case - fuzzy_parse not supported
@@ -939,18 +903,17 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
sql "STOP ROUTINE LOAD FOR ${job15}"
}
- // Test 16: ALTER to flex mode succeeds with jsonpaths
- def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths"
- def tableName16 = "test_routine_load_alter_flex_jsonpaths"
- def job16 = "test_alter_flex_jsonpaths_job"
+ // Test 16: ALTER to flex mode fails with jsonpaths
+ def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
+ def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
+ def job16 = "test_alter_flex_jsonpaths_error_job"
sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
sql """
CREATE TABLE IF NOT EXISTS ${tableName16} (
`id` int NOT NULL,
`name` varchar(65533) NULL,
- `score` int NULL,
- `age` int NULL
+ `score` int NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -962,15 +925,6 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
);
"""
- // insert initial data
- sql """
- INSERT INTO ${tableName16} VALUES
- (1, 'alice', 100, 20),
- (2, 'bob', 90, 21)
- """
-
- qt_select_initial16 "SELECT id, name, score, age FROM ${tableName16}
ORDER BY id"
-
try {
// create routine load with jsonpaths (UPSERT mode)
sql """
@@ -991,40 +945,17 @@ suite("test_routine_load_flexible_partial_update",
"nonConcurrent") {
sql "PAUSE ROUTINE LOAD FOR ${job16}"
- // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
- sql """
- ALTER ROUTINE LOAD FOR ${job16}
- PROPERTIES
- (
- "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
- );
- """
-
- // verify the property was changed
- def res = sql "SHOW ROUTINE LOAD FOR ${job16}"
- def jobProperties = res[0][11].toString()
- logger.info("Altered routine load job properties:
${jobProperties}")
- assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
-
- sql "RESUME ROUTINE LOAD FOR ${job16}"
-
- // send JSON data
- def data16 = [
- '{"id": 1, "name": "alice_updated", "score": 150}',
- '{"id": 3, "name": "charlie", "score": 80}'
- ]
-
- data16.each { line ->
- logger.info("Sending to Kafka: ${line}")
- def record = new ProducerRecord<>(kafkaJsonTopic16, null, line)
- producer.send(record).get()
+ // alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because
jsonpaths is set
+ test {
+ sql """
+ ALTER ROUTINE LOAD FOR ${job16}
+ PROPERTIES
+ (
+ "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+ );
+ """
+ exception "Flexible partial update does not support jsonpaths"
}
- producer.flush()
-
- RoutineLoadTestUtils.waitForTaskFinish(runSql, job16, tableName16,
2)
-
- // verify flexible partial update with jsonpaths after ALTER
- qt_select_after_alter_flex_jsonpaths "SELECT id, name, score, age
FROM ${tableName16} ORDER BY id"
} catch (Exception e) {
logger.error("Error during test: " + e.getMessage())
throw e
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]