This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new eb6a040a035 [branch-2.1] Picks "[Fix](partial update) Fix wrongly
update autoinc column in partial update #39996" (#40073)
eb6a040a035 is described below
commit eb6a040a03504fe943b8005a270493407e9749dd
Author: bobhan1 <[email protected]>
AuthorDate: Thu Aug 29 16:41:51 2024 +0800
[branch-2.1] Picks "[Fix](partial update) Fix wrongly update autoinc column
in partial update #39996" (#40073)
## Proposed changes
pick https://github.com/apache/doris/pull/39996
---
.../apache/doris/analysis/NativeInsertStmt.java | 12 +-
.../trees/plans/commands/insert/InsertUtils.java | 18 ++-
.../apache/doris/planner/StreamLoadPlanner.java | 42 ++++++-
.../partial_update/partial_update_autoinc1.csv | 2 +
.../partial_update/partial_update_autoinc2.csv | 2 +
.../partial_update/partial_update_autoinc3.csv | 2 +
.../partial_update/partial_update_autoinc4.csv | 2 +
.../test_partial_update_auto_inc.out | 70 ++++++------
.../test_delete_from_timeout.groovy | 2 +-
.../test_partial_update_auto_inc.groovy | 123 +++++++++++++++------
10 files changed, 186 insertions(+), 89 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index e9d9c152aa1..c3080fd14d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1361,7 +1361,7 @@ public class NativeInsertStmt extends InsertStmt {
if (hasEmptyTargetColumns) {
return;
}
- boolean hasMissingColExceptAutoInc = false;
+ boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : targetColumns) {
@@ -1374,16 +1374,16 @@ public class NativeInsertStmt extends InsertStmt {
break;
}
}
- if (!exists && !col.isAutoInc()) {
- if (col.isKey()) {
+ if (!exists) {
+ if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all
key columns, missing: " + col.getName());
}
- if (col.isVisible()) {
- hasMissingColExceptAutoInc = true;
+ if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+ hasMissingColExceptAutoIncKey = true;
}
}
}
- if (!hasMissingColExceptAutoInc) {
+ if (!hasMissingColExceptAutoIncKey) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 6a283ca023a..48ea98ff9de 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -276,21 +276,19 @@ public class InsertUtils {
if (unboundLogicalSink.getColNames().isEmpty()) {
((UnboundTableSink<? extends Plan>)
unboundLogicalSink).setPartialUpdate(false);
} else {
- boolean hasMissingColExceptAutoInc = false;
+ boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol =
unboundLogicalSink.getColNames().stream()
.filter(c ->
c.equalsIgnoreCase(col.getName())).findFirst();
- if (!col.isAutoInc() &&
!insertCol.isPresent()) {
- if (col.isKey()) {
- throw new AnalysisException("Partial
update should include all key columns,"
- + " missing: " +
col.getName());
- }
- if (col.isVisible()) {
- hasMissingColExceptAutoInc = true;
- }
+ if (col.isKey() && !col.isAutoInc() &&
!insertCol.isPresent()) {
+ throw new AnalysisException("Partial
update should include all key columns,"
+ + " missing: " + col.getName());
+ }
+ if (!(col.isAutoInc() && col.isKey()) &&
!insertCol.isPresent() && col.isVisible()) {
+ hasMissingColExceptAutoIncKey = true;
}
}
- if (!hasMissingColExceptAutoInc) {
+ if (!hasMissingColExceptAutoIncKey) {
((UnboundTableSink<? extends Plan>)
unboundLogicalSink).setPartialUpdate(false);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 3b012b173ab..d89f7b55794 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -148,6 +148,13 @@ public class StreamLoadPlanner {
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support
partial update");
}
+
+ // try to convert to upsert if only has missing auto-increment key
column
+ boolean hasMissingColExceptAutoIncKey = false;
+ if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
+ isPartialUpdate = false;
+ }
+
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
@@ -172,14 +179,23 @@ public class StreamLoadPlanner {
break;
}
}
- if (col.isKey() && !existInExpr) {
- throw new UserException("Partial update should include all
key columns, missing: " + col.getName());
+ if (!existInExpr) {
+ if (col.isKey() && !col.isAutoInc()) {
+ throw new UserException("Partial update should include
all key columns, missing: "
+ + col.getName());
+ }
+ if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+ hasMissingColExceptAutoIncKey = true;
+ }
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
+ if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
+ isPartialUpdate = false;
+ }
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate &&
!partialUpdateInputColumns.contains(col.getName())) {
@@ -247,7 +263,7 @@ public class StreamLoadPlanner {
// The load id will pass to csv reader to find the stream load context
from new load stream manager
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable,
BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(),
taskInfo.getFileType(), taskInfo.getHiddenColumns(),
- taskInfo.isPartialUpdate());
+ isPartialUpdate);
scanNode = fileScanNode;
scanNode.init(analyzer);
@@ -383,6 +399,13 @@ public class StreamLoadPlanner {
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support
partial update");
}
+
+ // try to convert to upsert if only has missing auto-increment key
column
+ boolean hasMissingColExceptAutoIncKey = false;
+ if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
+ isPartialUpdate = false;
+ }
+
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
@@ -407,14 +430,23 @@ public class StreamLoadPlanner {
break;
}
}
- if (col.isKey() && !existInExpr) {
- throw new UserException("Partial update should include all
key columns, missing: " + col.getName());
+ if (!existInExpr) {
+ if (col.isKey() && !col.isAutoInc()) {
+ throw new UserException("Partial update should include
all key columns, missing: "
+ + col.getName());
+ }
+ if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+ hasMissingColExceptAutoIncKey = true;
+ }
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
+ if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
+ isPartialUpdate = false;
+ }
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate &&
!partialUpdateInputColumns.contains(col.getName())) {
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
new file mode 100644
index 00000000000..63b02b8a306
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
@@ -0,0 +1,2 @@
+doris3
+doris4
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
new file mode 100644
index 00000000000..737c9a056f0
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
@@ -0,0 +1,2 @@
+102,doris8
+103,doris9
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
new file mode 100644
index 00000000000..c06e9e00d07
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
@@ -0,0 +1,2 @@
+104,"doris10"
+105,"doris11"
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
new file mode 100644
index 00000000000..3a227dba5f5
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
@@ -0,0 +1,2 @@
+2,888,888
+3,888,888
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
index 380575499e2..d157f501a8b 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
@@ -2,64 +2,72 @@
-- !select_1 --
doris1
doris2
-
--- !select_2 --
-2
-
--- !select_3 --
-doris1
-doris2
doris3
doris4
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
-- !select_2 --
-2
+4
-- !select_3 --
+"doris10"
+"doris11"
doris1
doris2
doris3
doris4
+doris5
+doris7
+doris8
+doris9
-- !select_4 --
-4
+10
--- !select_1 --
-doris1
-doris2
+-- !select_5 --
+1 10 10 10
+2 20 20 20
+3 30 30 30
+4 40 40 40
--- !select_2 --
-2
+-- !select_6 --
+1 99 99 10
+2 888 888 20
+3 888 888 30
+4 40 40 40
--- !select_3 --
+-- !select_1 --
doris1
doris2
doris3
doris4
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
-- !select_2 --
-2
+4
-- !select_3 --
+"doris10"
+"doris11"
doris1
doris2
doris3
doris4
+doris5
+doris7
+doris8
+doris9
-- !select_4 --
-4
+10
+
+-- !select_5 --
+1 10 10 10
+2 20 20 20
+3 30 30 30
+4 40 40 40
+
+-- !select_6 --
+1 99 99 10
+2 888 888 20
+3 888 888 30
+4 40 40 40
diff --git
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
index 7d1efdc9782..8598d791e01 100644
--- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") {
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
}
- sql """delete from ${tableName} where col1 = "false" and col2 =
"-9999782574499444.2" and col3 = "-25"; """
+ sql """delete from ${tableName} where col1 = "false" and col3 = "-25";
"""
t1.join()
qt_sql "select * from ${tableName} order by col1, col2, col3;"
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
index d0d1ecf9542..ec46939b2f5 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -20,48 +19,100 @@ suite("test_partial_update_auto_inc") {
String db = context.config.getDbNameByFile(context.file)
sql "select 1;" // to create database
- for (def use_mow : [false, true]) {
- for (def use_nereids_planner : [false, true]) {
- logger.info("current params: use_mow: ${use_mow},
use_nereids_planner: ${use_nereids_planner}")
- connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
- sql "use ${db};"
+ for (def use_nereids_planner : [false, true]) {
+ logger.info("current params: use_nereids_planner:
${use_nereids_planner}")
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ sql "use ${db};"
- if (use_nereids_planner) {
- sql """ set enable_nereids_dml = true; """
- sql """ set enable_nereids_planner=true; """
- sql """ set enable_fallback_to_original_planner=false; """
- } else {
- sql """ set enable_nereids_dml = false; """
- sql """ set enable_nereids_planner = false; """
- }
+ if (use_nereids_planner) {
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_planner = false; """
+ }
- // create table
- sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc """
- sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
- `id` BIGINT NOT NULL AUTO_INCREMENT,
- `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
- UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS
1
- PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "${use_mow}"); """
+ sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc """
+ sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
+ `id` BIGINT NOT NULL AUTO_INCREMENT,
+ `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"); """
- sql """ set enable_unique_key_partial_update=true; """
- sql """ insert into
test_primary_key_partial_update_auto_inc(name) values("doris1"); """
- sql """ set enable_unique_key_partial_update=false; """
- sql """ insert into
test_primary_key_partial_update_auto_inc(name) values("doris2"); """
- sql "sync"
+ sql """ set enable_unique_key_partial_update=true; """
+ sql "sync"
+ // insert stmt only misses auto-inc key column
+ sql """ insert into test_primary_key_partial_update_auto_inc(name)
values("doris1"); """
+ sql """ set enable_unique_key_partial_update=false; """
+ sql "sync"
+ sql """ insert into test_primary_key_partial_update_auto_inc(name)
values("doris2"); """
+ // stream load only misses auto-inc key column
+ streamLoad {
+ table "test_primary_key_partial_update_auto_inc"
+ set 'partial_columns', 'true'
+ set 'column_separator', ','
+ set 'columns', 'name'
+ file 'partial_update_autoinc1.csv'
+ time 10000
+ }
+ qt_select_1 """ select name from
test_primary_key_partial_update_auto_inc order by name; """
+ qt_select_2 """ select count(distinct id) from
test_primary_key_partial_update_auto_inc; """
- qt_select_1 """ select name from
test_primary_key_partial_update_auto_inc order by name; """
- qt_select_2 """ select count(distinct id) from
test_primary_key_partial_update_auto_inc; """
+ sql """ set enable_unique_key_partial_update=true; """
+ sql "sync"
+ // insert stmt withou column list
+ sql """ insert into test_primary_key_partial_update_auto_inc
values(100,"doris5"); """
+ // insert stmt, column list include all visible columns
+ sql """ insert into
test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """
+ sql """ set enable_unique_key_partial_update=false; """
+ sql "sync"
+ sql """ insert into test_primary_key_partial_update_auto_inc
values(101, "doris7"); """
+ // stream load withou column list
+ streamLoad {
+ table "test_primary_key_partial_update_auto_inc"
+ set 'partial_columns', 'true'
+ set 'column_separator', ','
+ file 'partial_update_autoinc2.csv'
+ time 10000
+ }
+ // stream load, column list include all visible columns
+ streamLoad {
+ table "test_primary_key_partial_update_auto_inc"
+ set 'partial_columns', 'true'
+ set 'column_separator', ','
+ set 'columns', 'id,name'
+ file 'partial_update_autoinc3.csv'
+ time 10000
+ }
+ qt_select_3 """ select name from
test_primary_key_partial_update_auto_inc order by name; """
+ qt_select_4 """ select count(distinct id) from
test_primary_key_partial_update_auto_inc; """
+ sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc """
- sql """ set enable_unique_key_partial_update=true; """
- sql """ insert into test_primary_key_partial_update_auto_inc
values(100,"doris3"); """
- sql """ set enable_unique_key_partial_update=false; """
- sql """ insert into test_primary_key_partial_update_auto_inc
values(101, "doris4"); """
- sql "sync"
- qt_select_3 """ select name from
test_primary_key_partial_update_auto_inc order by name; """
- qt_select_4 """ select count(distinct id) from
test_primary_key_partial_update_auto_inc; """
- sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc """
+ sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc2 """
+ sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 (
+ `id` BIGINT NOT NULL,
+ `c1` int,
+ `c2` int,
+ `cid` BIGINT NOT NULL AUTO_INCREMENT)
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"); """
+ sql "insert into test_primary_key_partial_update_auto_inc2
values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);"
+ order_qt_select_5 "select * from
test_primary_key_partial_update_auto_inc2"
+ sql """ set enable_unique_key_partial_update=true; """
+ sql "sync;"
+ // insert stmt only misses auto-inc value column, its value should
not change when do partial update
+ sql "insert into
test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);"
+ // stream load only misses auto-inc value column, its value should
not change when do partial update
+ streamLoad {
+ table "test_primary_key_partial_update_auto_inc2"
+ set 'partial_columns', 'true'
+ set 'column_separator', ','
+ set 'columns', 'id,c1,c2'
+ file 'partial_update_autoinc4.csv'
+ time 10000
}
+ order_qt_select_6 "select * from
test_primary_key_partial_update_auto_inc2"
+ sql """ DROP TABLE IF EXISTS
test_primary_key_partial_update_auto_inc2 """
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]