This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch routineload_flexible_update
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/routineload_flexible_update by 
this push:
     new 3d2a33a425f fix: Disable jsonpaths with flexible partial update in 
routine load
3d2a33a425f is described below

commit 3d2a33a425f2643b28688d8a4f66dfae21fafb3d
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Jan 15 21:12:48 2026 -0800

    fix: Disable jsonpaths with flexible partial update in routine load
    
    Flexible partial update cannot work correctly with jsonpaths because
    jsonpaths extracts values by position without knowing which columns
    should be preserved. This change adds validation to reject jsonpaths
    when flexible partial update is enabled.
    
    Changes:
    - Add jsonpaths validation in CreateRoutineLoadInfo for CREATE path
    - Add jsonpaths validation in RoutineLoadJob for ALTER path
    - Convert jsonpaths success tests to error tests
    - Update expected test outputs
---
 .../doris/load/routineload/RoutineLoadJob.java     |   5 +
 .../plans/commands/info/CreateRoutineLoadInfo.java |   5 +
 .../test_routine_load_flexible_partial_update.out  |  20 ----
 ...est_routine_load_flexible_partial_update.groovy | 113 ++++-----------------
 4 files changed, 32 insertions(+), 111 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index c1aa2971d2e..585c3167154 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -2102,6 +2102,11 @@ public abstract class RoutineLoadJob
                 JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) {
             throw new DdlException("Flexible partial update does not support 
fuzzy_parse");
         }
+        // Cannot use jsonpaths
+        String jsonPaths = getJsonPaths();
+        if (jsonPaths != null && !jsonPaths.isEmpty()) {
+            throw new DdlException("Flexible partial update does not support 
jsonpaths");
+        }
         // Cannot specify COLUMNS mapping
         if (columnDescs != null && !columnDescs.descs.isEmpty()) {
             throw new DdlException("Flexible partial update does not support 
COLUMNS specification");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 972649648cb..9394257013c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -463,6 +463,11 @@ public class CreateRoutineLoadInfo {
         if 
(Boolean.parseBoolean(jobProperties.getOrDefault(JsonFileFormatProperties.PROP_FUZZY_PARSE,
 "false"))) {
             throw new AnalysisException("Flexible partial update does not 
support fuzzy_parse");
         }
+        // Cannot use jsonpaths
+        String jsonPaths = 
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
+        if (jsonPaths != null && !jsonPaths.isEmpty()) {
+            throw new AnalysisException("Flexible partial update does not 
support jsonpaths");
+        }
         // Cannot specify COLUMNS mapping
         if (loadPropertyMap != null && loadPropertyMap.values().stream()
                 .anyMatch(p -> p instanceof LoadColumnClause)) {
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
index fdcd03a5560..0d4f162d308 100644
--- 
a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out
@@ -25,17 +25,6 @@
 3      1000    2000    3000    4000
 4      \N      9876    4444    1234
 
--- !select_initial4 --
-1      alice   100     20
-2      bob     90      21
-3      charlie 80      22
-
--- !select_after_flex_jsonpaths --
-1      alice_updated   150     20
-2      bob_updated     95      21
-3      charlie 80      22
-4      diana   70      \N
-
 -- !select_initial7 --
 1      alice   100     20
 2      bob     90      21
@@ -69,15 +58,6 @@
 3      charlie 80      22
 4      diana   \N      \N
 
--- !select_initial16 --
-1      alice   100     20
-2      bob     90      21
-
--- !select_after_alter_flex_jsonpaths --
-1      alice_updated   150     20
-2      bob     90      21
-3      charlie 80      \N
-
 -- !select_initial18 --
 1      alice   100     20
 2      bob     90      21
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
index 7eef79bb98b..12758ab366e 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy
@@ -232,18 +232,16 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             exception "Flexible partial update only supports JSON format"
         }
 
-        // Test 4: Success case - jsonpaths works with flexible partial update
-        def kafkaJsonTopic4 = 
"test_routine_load_flexible_partial_update_jsonpaths"
-        def tableName4 = "test_routine_load_flex_update_jsonpaths"
-        def job4 = "test_flex_partial_update_job_jsonpaths"
+        // Test 4: Error case - jsonpaths not supported with flexible partial 
update
+        def tableName4 = "test_routine_load_flex_update_jsonpaths_error"
+        def job4 = "test_flex_partial_update_job_jsonpaths_error"
 
         sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName4} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL,
-                `age` int NULL
+                `score` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -255,18 +253,7 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
-        // insert initial data
-        sql """
-            INSERT INTO ${tableName4} VALUES
-            (1, 'alice', 100, 20),
-            (2, 'bob', 90, 21),
-            (3, 'charlie', 80, 22)
-        """
-
-        qt_select_initial4 "SELECT id, name, score, age FROM ${tableName4} 
ORDER BY id"
-
-        try {
-            // create routine load with jsonpaths and flexible partial update
+        test {
             sql """
                 CREATE ROUTINE LOAD ${job4} ON ${tableName4}
                 PROPERTIES
@@ -279,34 +266,11 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
                 FROM KAFKA
                 (
                     "kafka_broker_list" = "${kafka_broker}",
-                    "kafka_topic" = "${kafkaJsonTopic4}",
+                    "kafka_topic" = "test_topic",
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
             """
-
-            // send JSON data - jsonpaths extracts id, name, score (age is not 
in jsonpaths so remains unchanged)
-            def data4 = [
-                '{"id": 1, "name": "alice_updated", "score": 150}',
-                '{"id": 2, "name": "bob_updated", "score": 95}',
-                '{"id": 4, "name": "diana", "score": 70}'
-            ]
-
-            data4.each { line ->
-                logger.info("Sending to Kafka: ${line}")
-                def record = new ProducerRecord<>(kafkaJsonTopic4, null, line)
-                producer.send(record).get()
-            }
-            producer.flush()
-
-            RoutineLoadTestUtils.waitForTaskFinish(runSql, job4, tableName4, 3)
-
-            // verify flexible partial update with jsonpaths works
-            qt_select_after_flex_jsonpaths "SELECT id, name, score, age FROM 
${tableName4} ORDER BY id"
-        } catch (Exception e) {
-            logger.error("Error during test: " + e.getMessage())
-            throw e
-        } finally {
-            sql "STOP ROUTINE LOAD FOR ${job4}"
+            exception "Flexible partial update does not support jsonpaths"
         }
 
         // Test 5: Error case - fuzzy_parse not supported
@@ -939,18 +903,17 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             sql "STOP ROUTINE LOAD FOR ${job15}"
         }
 
-        // Test 16: ALTER to flex mode succeeds with jsonpaths
-        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths"
-        def tableName16 = "test_routine_load_alter_flex_jsonpaths"
-        def job16 = "test_alter_flex_jsonpaths_job"
+        // Test 16: ALTER to flex mode fails with jsonpaths
+        def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def tableName16 = "test_routine_load_alter_flex_jsonpaths_error"
+        def job16 = "test_alter_flex_jsonpaths_error_job"
 
         sql """ DROP TABLE IF EXISTS ${tableName16} force;"""
         sql """
             CREATE TABLE IF NOT EXISTS ${tableName16} (
                 `id` int NOT NULL,
                 `name` varchar(65533) NULL,
-                `score` int NULL,
-                `age` int NULL
+                `score` int NULL
             ) ENGINE=OLAP
             UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS 3
@@ -962,15 +925,6 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
             );
         """
 
-        // insert initial data
-        sql """
-            INSERT INTO ${tableName16} VALUES
-            (1, 'alice', 100, 20),
-            (2, 'bob', 90, 21)
-        """
-
-        qt_select_initial16 "SELECT id, name, score, age FROM ${tableName16} 
ORDER BY id"
-
         try {
             // create routine load with jsonpaths (UPSERT mode)
             sql """
@@ -991,40 +945,17 @@ suite("test_routine_load_flexible_partial_update", 
"nonConcurrent") {
 
             sql "PAUSE ROUTINE LOAD FOR ${job16}"
 
-            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed
-            sql """
-                ALTER ROUTINE LOAD FOR ${job16}
-                PROPERTIES
-                (
-                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
-                );
-            """
-
-            // verify the property was changed
-            def res = sql "SHOW ROUTINE LOAD FOR ${job16}"
-            def jobProperties = res[0][11].toString()
-            logger.info("Altered routine load job properties: 
${jobProperties}")
-            assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS"))
-
-            sql "RESUME ROUTINE LOAD FOR ${job16}"
-
-            // send JSON data
-            def data16 = [
-                '{"id": 1, "name": "alice_updated", "score": 150}',
-                '{"id": 3, "name": "charlie", "score": 80}'
-            ]
-
-            data16.each { line ->
-                logger.info("Sending to Kafka: ${line}")
-                def record = new ProducerRecord<>(kafkaJsonTopic16, null, line)
-                producer.send(record).get()
+            // alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because 
jsonpaths is set
+            test {
+                sql """
+                    ALTER ROUTINE LOAD FOR ${job16}
+                    PROPERTIES
+                    (
+                        "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                    );
+                """
+                exception "Flexible partial update does not support jsonpaths"
             }
-            producer.flush()
-
-            RoutineLoadTestUtils.waitForTaskFinish(runSql, job16, tableName16, 
2)
-
-            // verify flexible partial update with jsonpaths after ALTER
-            qt_select_after_alter_flex_jsonpaths "SELECT id, name, score, age 
FROM ${tableName16} ORDER BY id"
         } catch (Exception e) {
             logger.error("Error during test: " + e.getMessage())
             throw e


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to