Copilot commented on code in PR #59990:
URL: https://github.com/apache/doris/pull/59990#discussion_r2700688161


##########
regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy:
##########
@@ -0,0 +1,1414 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_flexible_partial_update", "nonConcurrent") {
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        // Test 1: Basic flexible partial update
+        def kafkaJsonTopic1 = "test_routine_load_flexible_partial_update_basic"
+        def tableName1 = "test_routine_load_flex_update_basic"
+        def job1 = "test_flex_partial_update_job_basic"
+
+        sql """ DROP TABLE IF EXISTS ${tableName1} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // verify skip bitmap column is enabled
+        def show_res = sql "show create table ${tableName1}"
+        
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" 
= "true"'))
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName1} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22),
+            (4, 'david', 70, 23),
+            (5, 'eve', 60, 24)
+        """
+
+        qt_select_initial1 "SELECT id, name, score, age FROM ${tableName1} 
ORDER BY id"
+
+        try {
+            // create routine load with flexible partial update
+            sql """
+                CREATE ROUTINE LOAD ${job1} ON ${tableName1}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic1}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            // Row 1: update only score for id=1
+            // Row 2: update only age for id=2
+            // Row 3: update both name and score for id=3
+            // Row 4: insert new row with only id and name
+            def data = [
+                '{"id": 1, "score": 150}',
+                '{"id": 2, "age": 30}',
+                '{"id": 3, "name": "chuck", "score": 95}',
+                '{"id": 6, "name": "frank"}'
+            ]
+
+            data.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic1, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            // wait for routine load task to finish
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 4)
+
+            // verify flexible partial update results
+            qt_select_after_flex_update1 "SELECT id, name, score, age FROM 
${tableName1} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job1}"
+        }
+
+        // Test 2: Flexible partial update with default values
+        def kafkaJsonTopic2 = 
"test_routine_load_flexible_partial_update_default"
+        def tableName2 = "test_routine_load_flex_update_default"
+        def job2 = "test_flex_partial_update_job_default"
+
+        sql """ DROP TABLE IF EXISTS ${tableName2} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                `id` int NOT NULL,
+                `v1` bigint NULL,
+                `v2` bigint NULL DEFAULT "9876",
+                `v3` bigint NOT NULL,
+                `v4` bigint NOT NULL DEFAULT "1234"
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'test flexible partial update with default values'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName2} VALUES
+            (1, 10, 20, 30, 40),
+            (2, 100, 200, 300, 400),
+            (3, 1000, 2000, 3000, 4000)
+        """
+
+        qt_select_initial2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER 
BY id"
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job2} ON ${tableName2}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaJsonTopic2}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // send JSON data with different columns per row
+            def data2 = [
+                '{"id": 1, "v1": 11}',
+                '{"id": 2, "v2": 222, "v3": 333}',
+                '{"id": 4, "v3": 4444}'
+            ]
+
+            data2.each { line ->
+                logger.info("Sending to Kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaJsonTopic2, null, line)
+                producer.send(record).get()
+            }
+            producer.flush()
+
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 3)
+
+            qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM 
${tableName2} ORDER BY id"
+        } catch (Exception e) {
+            logger.error("Error during test: " + e.getMessage())
+            throw e
+        } finally {
+            sql "STOP ROUTINE LOAD FOR ${job2}"
+        }
+
+        // Test 3: Error case - CSV format not supported
+        def kafkaCsvTopic3 = 
"test_routine_load_flexible_partial_update_csv_error"
+        def tableName3 = "test_routine_load_flex_update_csv_error"
+        def job3 = "test_flex_partial_update_job_csv_error"
+
+        sql """ DROP TABLE IF EXISTS ${tableName3} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName3} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        test {
+            sql """
+                CREATE ROUTINE LOAD ${job3} ON ${tableName3}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic3}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            exception "Flexible partial update only supports JSON format"
+        }
+
+        // Test 4: Success case - jsonpaths works with flexible partial update
+        def kafkaJsonTopic4 = 
"test_routine_load_flexible_partial_update_jsonpaths"
+        def tableName4 = "test_routine_load_flex_update_jsonpaths"
+        def job4 = "test_flex_partial_update_job_jsonpaths"
+
+        sql """ DROP TABLE IF EXISTS ${tableName4} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName4} (
+                `id` int NOT NULL,
+                `name` varchar(65533) NULL,
+                `score` int NULL,
+                `age` int NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 3
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "enable_unique_key_merge_on_write" = "true",
+                "light_schema_change" = "true",
+                "enable_unique_key_skip_bitmap_column" = "true"
+            );
+        """
+
+        // insert initial data
+        sql """
+            INSERT INTO ${tableName4} VALUES
+            (1, 'alice', 100, 20),
+            (2, 'bob', 90, 21),
+            (3, 'charlie', 80, 22)
+        """
+
+        qt_select_initial4 "SELECT id, name, score, age FROM ${tableName4} 
ORDER BY id"
+
+        try {
+            // create routine load with jsonpaths and flexible partial update
+            sql """
+                CREATE ROUTINE LOAD ${job4} ON ${tableName4}
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10",
+                    "format" = "json",
+                    "jsonpaths" = '["\\$.id", "\\$.name", "\\$.score"]',

Review Comment:
   Inconsistent string escaping for jsonpaths property. Line 276 uses single 
quotes with backslash escaping '["\\$.id", ...]', while line 982 uses double 
quotes with triple backslash escaping "[\\"\\$.id\\", ...]". Both tests should 
use the same escaping pattern for consistency and readability. Consider 
standardizing on one approach throughout the test file.
   ```suggestion
                       "jsonpaths" = "[\\\"\\$.id\\", \\\"\\$.name\\", 
\\\"\\$.score\\"]",
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -368,17 +369,19 @@ public long beginTransaction(List<Long> tableIdList, 
String label, TUniqueId req
             checkRunningTxnExceedLimit(tableIdList);
 
             tid = idGenerator.getNextTransactionId();
-            TransactionState transactionState = new TransactionState(dbId, 
tableIdList,
+            transactionState = new TransactionState(dbId, tableIdList,
                     tid, label, requestId, sourceType, coordinator, 
listenerId, timeoutSecond * 1000);
             transactionState.setPrepareTime(System.currentTimeMillis());
-            unprotectUpsertTransactionState(transactionState, false);
+            unprotectUpdateInMemoryState(transactionState, false);
 
             if (MetricRepo.isInit) {
                 MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
             }
         } finally {
             writeUnlock();
         }
+        // Persist edit log outside lock to reduce lock contention
+        persistTransactionState(transactionState);

Review Comment:
   The persistTransactionState method is called outside the write lock with a 
transactionState variable that could potentially be null if an exception is 
thrown inside the try block before line 372. If any of the checks before line 
372 throw an exception (such as checkRunningTxnExceedLimit at line 369), 
transactionState will remain null and calling 
persistTransactionState(transactionState) at line 384 would result in a 
NullPointerException. Consider adding a null check before calling 
persistTransactionState, or wrapping the call in a condition that ensures 
transactionState is not null.
   ```suggestion
           if (transactionState != null) {
               persistTransactionState(transactionState);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to