This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new eb6a040a035 [branch-2.1] Picks "[Fix](partial update) Fix wrongly 
update autoinc column in partial update #39996" (#40073)
eb6a040a035 is described below

commit eb6a040a03504fe943b8005a270493407e9749dd
Author: bobhan1 <[email protected]>
AuthorDate: Thu Aug 29 16:41:51 2024 +0800

    [branch-2.1] Picks "[Fix](partial update) Fix wrongly update autoinc column 
in partial update #39996" (#40073)
    
    ## Proposed changes
    
    pick https://github.com/apache/doris/pull/39996
---
 .../apache/doris/analysis/NativeInsertStmt.java    |  12 +-
 .../trees/plans/commands/insert/InsertUtils.java   |  18 ++-
 .../apache/doris/planner/StreamLoadPlanner.java    |  42 ++++++-
 .../partial_update/partial_update_autoinc1.csv     |   2 +
 .../partial_update/partial_update_autoinc2.csv     |   2 +
 .../partial_update/partial_update_autoinc3.csv     |   2 +
 .../partial_update/partial_update_autoinc4.csv     |   2 +
 .../test_partial_update_auto_inc.out               |  70 ++++++------
 .../test_delete_from_timeout.groovy                |   2 +-
 .../test_partial_update_auto_inc.groovy            | 123 +++++++++++++++------
 10 files changed, 186 insertions(+), 89 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index e9d9c152aa1..c3080fd14d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1361,7 +1361,7 @@ public class NativeInsertStmt extends InsertStmt {
         if (hasEmptyTargetColumns) {
             return;
         }
-        boolean hasMissingColExceptAutoInc = false;
+        boolean hasMissingColExceptAutoIncKey = false;
         for (Column col : olapTable.getFullSchema()) {
             boolean exists = false;
             for (Column insertCol : targetColumns) {
@@ -1374,16 +1374,16 @@ public class NativeInsertStmt extends InsertStmt {
                     break;
                 }
             }
-            if (!exists && !col.isAutoInc()) {
-                if (col.isKey()) {
+            if (!exists) {
+                if (col.isKey() && !col.isAutoInc()) {
                     throw new UserException("Partial update should include all 
key columns, missing: " + col.getName());
                 }
-                if (col.isVisible()) {
-                    hasMissingColExceptAutoInc = true;
+                if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+                    hasMissingColExceptAutoIncKey = true;
                 }
             }
         }
-        if (!hasMissingColExceptAutoInc) {
+        if (!hasMissingColExceptAutoIncKey) {
             return;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 6a283ca023a..48ea98ff9de 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -276,21 +276,19 @@ public class InsertUtils {
                         if (unboundLogicalSink.getColNames().isEmpty()) {
                             ((UnboundTableSink<? extends Plan>) 
unboundLogicalSink).setPartialUpdate(false);
                         } else {
-                            boolean hasMissingColExceptAutoInc = false;
+                            boolean hasMissingColExceptAutoIncKey = false;
                             for (Column col : olapTable.getFullSchema()) {
                                 Optional<String> insertCol = 
unboundLogicalSink.getColNames().stream()
                                         .filter(c -> 
c.equalsIgnoreCase(col.getName())).findFirst();
-                                if (!col.isAutoInc() && 
!insertCol.isPresent()) {
-                                    if (col.isKey()) {
-                                        throw new AnalysisException("Partial 
update should include all key columns,"
-                                                + " missing: " + 
col.getName());
-                                    }
-                                    if (col.isVisible()) {
-                                        hasMissingColExceptAutoInc = true;
-                                    }
+                                if (col.isKey() && !col.isAutoInc() && 
!insertCol.isPresent()) {
+                                    throw new AnalysisException("Partial 
update should include all key columns,"
+                                            + " missing: " + col.getName());
+                                }
+                                if (!(col.isAutoInc() && col.isKey()) && 
!insertCol.isPresent() && col.isVisible()) {
+                                    hasMissingColExceptAutoIncKey = true;
                                 }
                             }
-                            if (!hasMissingColExceptAutoInc) {
+                            if (!hasMissingColExceptAutoIncKey) {
                                 ((UnboundTableSink<? extends Plan>) 
unboundLogicalSink).setPartialUpdate(false);
                             }
                         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 3b012b173ab..d89f7b55794 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -148,6 +148,13 @@ public class StreamLoadPlanner {
         if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
             throw new UserException("Only unique key merge on write support 
partial update");
         }
+
+        // try to convert to upsert if only has missing auto-increment key 
column
+        boolean hasMissingColExceptAutoIncKey = false;
+        if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
+            isPartialUpdate = false;
+        }
+
         HashSet<String> partialUpdateInputColumns = new HashSet<>();
         if (isPartialUpdate) {
             for (Column col : destTable.getFullSchema()) {
@@ -172,14 +179,23 @@ public class StreamLoadPlanner {
                         break;
                     }
                 }
-                if (col.isKey() && !existInExpr) {
-                    throw new UserException("Partial update should include all 
key columns, missing: " + col.getName());
+                if (!existInExpr) {
+                    if (col.isKey() && !col.isAutoInc()) {
+                        throw new UserException("Partial update should include 
all key columns, missing: "
+                                + col.getName());
+                    }
+                    if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+                        hasMissingColExceptAutoIncKey = true;
+                    }
                 }
             }
             if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
                 partialUpdateInputColumns.add(Column.DELETE_SIGN);
             }
         }
+        if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
+            isPartialUpdate = false;
+        }
         // here we should be full schema to fill the descriptor table
         for (Column col : destTable.getFullSchema()) {
             if (isPartialUpdate && 
!partialUpdateInputColumns.contains(col.getName())) {
@@ -247,7 +263,7 @@ public class StreamLoadPlanner {
         // The load id will pass to csv reader to find the stream load context 
from new load stream manager
         fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, 
BrokerDesc.createForStreamLoad(),
                 fileGroup, fileStatus, taskInfo.isStrictMode(), 
taskInfo.getFileType(), taskInfo.getHiddenColumns(),
-                taskInfo.isPartialUpdate());
+                isPartialUpdate);
         scanNode = fileScanNode;
 
         scanNode.init(analyzer);
@@ -383,6 +399,13 @@ public class StreamLoadPlanner {
         if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
             throw new UserException("Only unique key merge on write support 
partial update");
         }
+
+        // try to convert to upsert if only has missing auto-increment key 
column
+        boolean hasMissingColExceptAutoIncKey = false;
+        if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
+            isPartialUpdate = false;
+        }
+
         HashSet<String> partialUpdateInputColumns = new HashSet<>();
         if (isPartialUpdate) {
             for (Column col : destTable.getFullSchema()) {
@@ -407,14 +430,23 @@ public class StreamLoadPlanner {
                         break;
                     }
                 }
-                if (col.isKey() && !existInExpr) {
-                    throw new UserException("Partial update should include all 
key columns, missing: " + col.getName());
+                if (!existInExpr) {
+                    if (col.isKey() && !col.isAutoInc()) {
+                        throw new UserException("Partial update should include 
all key columns, missing: "
+                                + col.getName());
+                    }
+                    if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+                        hasMissingColExceptAutoIncKey = true;
+                    }
                 }
             }
             if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
                 partialUpdateInputColumns.add(Column.DELETE_SIGN);
             }
         }
+        if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
+            isPartialUpdate = false;
+        }
         // here we should be full schema to fill the descriptor table
         for (Column col : destTable.getFullSchema()) {
             if (isPartialUpdate && 
!partialUpdateInputColumns.contains(col.getName())) {
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
new file mode 100644
index 00000000000..63b02b8a306
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
@@ -0,0 +1,2 @@
+doris3
+doris4
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
new file mode 100644
index 00000000000..737c9a056f0
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
@@ -0,0 +1,2 @@
+102,doris8
+103,doris9
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
new file mode 100644
index 00000000000..c06e9e00d07
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
@@ -0,0 +1,2 @@
+104,"doris10"
+105,"doris11"
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
new file mode 100644
index 00000000000..3a227dba5f5
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
@@ -0,0 +1,2 @@
+2,888,888
+3,888,888
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
index 380575499e2..d157f501a8b 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
@@ -2,64 +2,72 @@
 -- !select_1 --
 doris1
 doris2
-
--- !select_2 --
-2
-
--- !select_3 --
-doris1
-doris2
 doris3
 doris4
 
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
 -- !select_2 --
-2
+4
 
 -- !select_3 --
+"doris10"
+"doris11"
 doris1
 doris2
 doris3
 doris4
+doris5
+doris7
+doris8
+doris9
 
 -- !select_4 --
-4
+10
 
--- !select_1 --
-doris1
-doris2
+-- !select_5 --
+1      10      10      10
+2      20      20      20
+3      30      30      30
+4      40      40      40
 
--- !select_2 --
-2
+-- !select_6 --
+1      99      99      10
+2      888     888     20
+3      888     888     30
+4      40      40      40
 
--- !select_3 --
+-- !select_1 --
 doris1
 doris2
 doris3
 doris4
 
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
 -- !select_2 --
-2
+4
 
 -- !select_3 --
+"doris10"
+"doris11"
 doris1
 doris2
 doris3
 doris4
+doris5
+doris7
+doris8
+doris9
 
 -- !select_4 --
-4
+10
+
+-- !select_5 --
+1      10      10      10
+2      20      20      20
+3      30      30      30
+4      40      40      40
+
+-- !select_6 --
+1      99      99      10
+2      888     888     20
+3      888     888     30
+4      40      40      40
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy 
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
index 7d1efdc9782..8598d791e01 100644
--- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") {
             
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
         }
 
-        sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+        sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; 
"""
         t1.join()
         qt_sql "select * from ${tableName} order by col1, col2, col3;"
 
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
index d0d1ecf9542..ec46939b2f5 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -20,48 +19,100 @@ suite("test_partial_update_auto_inc") {
     String db = context.config.getDbNameByFile(context.file)
     sql "select 1;" // to create database
 
-    for (def use_mow : [false, true]) {
-        for (def use_nereids_planner : [false, true]) {
-            logger.info("current params: use_mow: ${use_mow}, 
use_nereids_planner: ${use_nereids_planner}")
-            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
-                sql "use ${db};"
+    for (def use_nereids_planner : [false, true]) {
+        logger.info("current params: use_nereids_planner: 
${use_nereids_planner}")
+        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+            sql "use ${db};"
 
-                if (use_nereids_planner) {
-                    sql """ set enable_nereids_dml = true; """
-                    sql """ set enable_nereids_planner=true; """
-                    sql """ set enable_fallback_to_original_planner=false; """
-                } else {
-                    sql """ set enable_nereids_dml = false; """
-                    sql """ set enable_nereids_planner = false; """
-                }
+            if (use_nereids_planner) {
+                sql """ set enable_nereids_planner=true; """
+                sql """ set enable_fallback_to_original_planner=false; """
+            } else {
+                sql """ set enable_nereids_planner = false; """
+            }
 
-                // create table
-                sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
-                sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
-                            `id` BIGINT NOT NULL AUTO_INCREMENT,
-                            `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
-                            UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 
1
-                            PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "${use_mow}"); """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
+            sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
+                        `id` BIGINT NOT NULL AUTO_INCREMENT,
+                        `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
+                        UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                        PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true"); """
 
-                sql """ set enable_unique_key_partial_update=true; """
-                sql """ insert into 
test_primary_key_partial_update_auto_inc(name) values("doris1"); """
-                sql """ set enable_unique_key_partial_update=false; """
-                sql """ insert into 
test_primary_key_partial_update_auto_inc(name) values("doris2"); """
-                sql "sync"
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync"
+            // insert stmt only misses auto-inc key column
+            sql """ insert into test_primary_key_partial_update_auto_inc(name) 
values("doris1"); """
+            sql """ set enable_unique_key_partial_update=false; """
+            sql "sync"
+            sql """ insert into test_primary_key_partial_update_auto_inc(name) 
values("doris2"); """
+            // stream load only misses auto-inc key column
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'name'
+                file 'partial_update_autoinc1.csv'
+                time 10000
+            }
+            qt_select_1 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
+            qt_select_2 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
 
-                qt_select_1 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
-                qt_select_2 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync"
+            // insert stmt withou column list
+            sql """ insert into test_primary_key_partial_update_auto_inc 
values(100,"doris5"); """
+            // insert stmt, column list include all visible columns
+            sql """ insert into 
test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """
+            sql """ set enable_unique_key_partial_update=false; """
+            sql "sync"
+            sql """ insert into test_primary_key_partial_update_auto_inc 
values(101, "doris7"); """
+            // stream load withou column list
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                file 'partial_update_autoinc2.csv'
+                time 10000
+            }
+            // stream load, column list include all visible columns
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'id,name'
+                file 'partial_update_autoinc3.csv'
+                time 10000
+            }
+            qt_select_3 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
+            qt_select_4 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
 
-                sql """ set enable_unique_key_partial_update=true; """
-                sql """ insert into test_primary_key_partial_update_auto_inc 
values(100,"doris3"); """
-                sql """ set enable_unique_key_partial_update=false; """
-                sql """ insert into test_primary_key_partial_update_auto_inc 
values(101, "doris4"); """
-                sql "sync"
-                qt_select_3 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
-                qt_select_4 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
 
-                sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc2 """
+            sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 (
+                        `id` BIGINT NOT NULL,
+                        `c1` int,
+                        `c2` int,
+                        `cid` BIGINT NOT NULL AUTO_INCREMENT)
+                        UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                        PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true"); """
+            sql "insert into test_primary_key_partial_update_auto_inc2 
values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);"
+            order_qt_select_5 "select * from 
test_primary_key_partial_update_auto_inc2"
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync;"
+            // insert stmt only misses auto-inc value column, its value should 
not change when do partial update
+            sql "insert into 
test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);"
+            // stream load only misses auto-inc value column, its value should 
not change when do partial update
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc2"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'id,c1,c2'
+                file 'partial_update_autoinc4.csv'
+                time 10000
             }
+            order_qt_select_6 "select * from 
test_primary_key_partial_update_auto_inc2"
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc2 """
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to