This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ea341f56c1 [Fix](nereids) branch-2.0 Support partial update for insert 
into table #24594 (#24627)
ea341f56c1 is described below

commit ea341f56c165ce1720a6959d7424628d6f1706dd
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 21 17:21:25 2023 +0800

    [Fix](nereids) branch-2.0 Support partial update for insert into table 
#24594 (#24627)
---
 .../glue/translator/PhysicalPlanTranslator.java    | 43 ++++++++++++----------
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  2 +-
 .../doris/nereids/rules/analysis/BindSink.java     | 20 +++++++++-
 .../plans/commands/InsertIntoTableCommand.java     |  8 +++-
 .../insert_into_table/partial_update.groovy        |  5 ++-
 .../partial_update_complex.groovy                  |  3 +-
 .../test_partial_update_native_insert_stmt.groovy  |  3 +-
 ...artial_update_native_insert_stmt_complex.groovy |  2 +-
 8 files changed, 56 insertions(+), 30 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c0dc46bdba..47a72b851e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -361,29 +361,14 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment rootFragment = olapTableSink.child().accept(this, 
context);
         rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
 
-        TupleDescriptor olapTuple = context.generateTupleDesc();
-        List<Column> targetTableColumns = 
olapTableSink.getTargetTable().getFullSchema();
-        for (Column column : targetTableColumns) {
-            SlotDescriptor slotDesc = context.addSlotDesc(olapTuple);
-            slotDesc.setIsMaterialized(true);
-            slotDesc.setType(column.getType());
-            slotDesc.setColumn(column);
-            slotDesc.setIsNullable(column.isAllowNull());
-        }
-        OlapTableSink sink = new OlapTableSink(
-                olapTableSink.getTargetTable(),
-                olapTuple,
-                olapTableSink.getPartitionIds().isEmpty() ? null : 
olapTableSink.getPartitionIds(),
-                olapTableSink.isSingleReplicaLoad()
-        );
-        if (olapTableSink.isPartialUpdate() || 
(olapTableSink.isFromNativeInsertStmt()
-                && 
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate())) {
-            OlapTable olapTable = olapTableSink.getTargetTable();
+        HashSet<String> partialUpdateCols = new HashSet<>();
+        boolean isPartialUpdate = olapTableSink.isPartialUpdate();
+        if (isPartialUpdate) {
+            OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable();
             if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
                 throw new AnalysisException("Partial update is only allowed in"
                         + "unique table with merge-on-write enabled.");
             }
-            HashSet<String> partialUpdateCols = new HashSet<>();
             for (Column col : olapTable.getFullSchema()) {
                 boolean exists = false;
                 for (Column insertCol : olapTableSink.getCols()) {
@@ -404,8 +389,26 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         && 
partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
                 partialUpdateCols.add(Column.SEQUENCE_COL);
             }
-            sink.setPartialUpdateInputColumns(true, partialUpdateCols);
         }
+        TupleDescriptor olapTuple = context.generateTupleDesc();
+        List<Column> targetTableColumns = 
olapTableSink.getTargetTable().getFullSchema();
+        for (Column column : targetTableColumns) {
+            if (isPartialUpdate && 
!partialUpdateCols.contains(column.getName())) {
+                continue;
+            }
+            SlotDescriptor slotDesc = context.addSlotDesc(olapTuple);
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setType(column.getType());
+            slotDesc.setColumn(column);
+            slotDesc.setIsNullable(column.isAllowNull());
+        }
+        OlapTableSink sink = new OlapTableSink(
+                olapTableSink.getTargetTable(),
+                olapTuple,
+                olapTableSink.getPartitionIds().isEmpty() ? null : 
olapTableSink.getPartitionIds(),
+                olapTableSink.isSingleReplicaLoad()
+        );
+        sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
         rootFragment.setSink(sink);
 
         return rootFragment;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 47d542b736..a33647ff6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -325,7 +325,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                 colNames,
                 ImmutableList.of(),
                 partitions,
-                false,
+                
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
                 true,
                 visitQuery(ctx.query()));
         if (ctx.explain() != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index c4a9ac6bd3..479490cb29 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -135,7 +135,14 @@ public class BindSink implements AnalysisRuleFactory {
                                                     .filter(col -> 
col.getName().equals(table.getSequenceMapCol()))
                                                     .findFirst().get();
                                             
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName()));
+                                        } else if (sink.isPartialUpdate()) {
+                                            // If the current load is a 
partial update, the values of unmentioned
+                                            // columns will be filled in 
SegmentWriter. And the output of sink node
+                                            // should not contain these 
unmentioned columns, so we just skip them.
+                                            continue;
                                         } else if (column.getDefaultValue() == 
null) {
+                                            // Otherwise, the unmentioned 
columns should be filled with default values
+                                            // or null values
                                             
columnToOutput.put(column.getName(), new Alias(
                                                     new 
NullLiteral(DataType.fromCatalogType(column.getType())),
                                                     column.getName()));
@@ -162,8 +169,17 @@ public class BindSink implements AnalysisRuleFactory {
                             // add cast project
                             List<NamedExpression> castExprs = 
Lists.newArrayList();
                             for (int i = 0; i < table.getFullSchema().size(); 
++i) {
-                                Expression castExpr = 
TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i),
-                                        
DataType.fromCatalogType(table.getFullSchema().get(i).getType()));
+                                Column col = table.getFullSchema().get(i);
+                                NamedExpression expr = (NamedExpression) 
columnToOutput.get(col.getName());
+                                if (expr == null) {
+                                    // If `expr` is null, it means that the 
current load is a partial update
+                                    // and `col` should not be contained in 
the output of the sink node so
+                                    // we skip it.
+                                    continue;
+                                }
+                                Expression castExpr = 
TypeCoercionUtils.castIfNotSameType(
+                                        expr,
+                                        
DataType.fromCatalogType(col.getType()));
                                 if (castExpr instanceof NamedExpression) {
                                     castExprs.add(((NamedExpression) 
castExpr));
                                 } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index f35b7c9dce..c7895de4ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -118,11 +118,15 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                 physicalOlapTableSink.getDatabase(),
                 physicalOlapTableSink.getTargetTable(), label, planner);
         isTxnBegin = true;
-
+        boolean isStrictMode = 
(ctx.getSessionVariable().getEnableInsertStrict()
+                && physicalOlapTableSink.isPartialUpdate()
+                && physicalOlapTableSink.isFromNativeInsertStmt());
         sink.init(ctx.queryId(), txn.getTxnId(),
                 physicalOlapTableSink.getDatabase().getId(),
                 ctx.getExecTimeout(),
-                ctx.getSessionVariable().getSendBatchParallelism(), false, 
false);
+                ctx.getSessionVariable().getSendBatchParallelism(),
+                false,
+                isStrictMode);
 
         sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
         TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(
diff --git 
a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy 
b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
index 472acfc0c5..6068bd093b 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
@@ -22,7 +22,6 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
     sql "set enable_fallback_to_original_planner=false;"
     sql "sync;"
 
-    // sql 'set enable_fallback_to_original_planner=false'
     def tableName = "nereids_partial_update_native_insert_stmt"
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """
@@ -100,6 +99,7 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
     sql """insert into ${tableName3} values(2, "doris2", 2000, 223, 1),(1, 
"doris", 1000, 123, 1);"""
     qt_3 """ select * from ${tableName3} order by id; """
     sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict = false;"
     sql "sync;"
     // in partial update, the unmentioned columns should have default values 
or be nullable
     // but field `name` is not nullable and doesn't have default value
@@ -221,7 +221,8 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
     sql """ DROP TABLE IF EXISTS ${tableName7}; """
 
     sql "set enable_unique_key_partial_update=false;"
-    sql "set enable_insert_strict = false;"
+    sql "set enable_insert_strict = true;"
     sql "set enable_fallback_to_original_planner=true;"
+    sql "set enable_nereids_dml=false;"
     sql "sync;"
 }
diff --git 
a/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
 
b/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
index 66945fed05..b1d8be2bff 100644
--- 
a/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
+++ 
b/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
@@ -111,7 +111,8 @@ suite("nereids_partial_update_native_insert_stmt_complex", 
"p0") {
     sql "DROP TABLE IF EXISTS ${tbName3}"
 
     sql "set enable_unique_key_partial_update=false;"
-    sql "set enable_insert_strict = false;"
+    sql "set enable_insert_strict = true;"
     sql "set enable_fallback_to_original_planner=true;"
+    sql "set enable_nereids_dml=false;"
     sql "sync;"
 }
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
index 23a44c846e..a5c0360b9d 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
@@ -100,6 +100,7 @@ suite("test_partial_update_native_insert_stmt", "p0") {
     sql """insert into ${tableName3} values(2, "doris2", 2000, 223, 1),(1, 
"doris", 1000, 123, 1);"""
     qt_3 """ select * from ${tableName3} order by id; """
     sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict = false;"
     sql "sync;"
     // in partial update, the unmentioned columns should have default values 
or be nullable
     // but field `name` is not nullable and doesn't have default value
@@ -221,7 +222,7 @@ suite("test_partial_update_native_insert_stmt", "p0") {
     sql """ DROP TABLE IF EXISTS ${tableName7}; """
 
     sql "set enable_unique_key_partial_update=false;"
-    sql "set enable_insert_strict = false;"
+    sql "set enable_insert_strict = true;"
     sql "set experimental_enable_nereids_planner=true;"
     sql "sync;"
 }
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
index bdb3a12dc5..8070db512d 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
@@ -111,7 +111,7 @@ suite("test_partial_update_native_insert_stmt_complex", 
"p0") {
     sql "DROP TABLE IF EXISTS ${tbName3}"
 
     sql "set enable_unique_key_partial_update=false;"
-    sql "set enable_insert_strict = false;"
+    sql "set enable_insert_strict = true;"
     sql "set experimental_enable_nereids_planner=true;"
     sql "sync;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to