This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ea341f56c1 [Fix](nereids) branch-2.0 Support partial update for insert
into table #24594 (#24627)
ea341f56c1 is described below
commit ea341f56c165ce1720a6959d7424628d6f1706dd
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 21 17:21:25 2023 +0800
[Fix](nereids) branch-2.0 Support partial update for insert into table
#24594 (#24627)
---
.../glue/translator/PhysicalPlanTranslator.java | 43 ++++++++++++----------
.../doris/nereids/parser/LogicalPlanBuilder.java | 2 +-
.../doris/nereids/rules/analysis/BindSink.java | 20 +++++++++-
.../plans/commands/InsertIntoTableCommand.java | 8 +++-
.../insert_into_table/partial_update.groovy | 5 ++-
.../partial_update_complex.groovy | 3 +-
.../test_partial_update_native_insert_stmt.groovy | 3 +-
...artial_update_native_insert_stmt_complex.groovy | 2 +-
8 files changed, 56 insertions(+), 30 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c0dc46bdba..47a72b851e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -361,29 +361,14 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PlanFragment rootFragment = olapTableSink.child().accept(this,
context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
- TupleDescriptor olapTuple = context.generateTupleDesc();
- List<Column> targetTableColumns =
olapTableSink.getTargetTable().getFullSchema();
- for (Column column : targetTableColumns) {
- SlotDescriptor slotDesc = context.addSlotDesc(olapTuple);
- slotDesc.setIsMaterialized(true);
- slotDesc.setType(column.getType());
- slotDesc.setColumn(column);
- slotDesc.setIsNullable(column.isAllowNull());
- }
- OlapTableSink sink = new OlapTableSink(
- olapTableSink.getTargetTable(),
- olapTuple,
- olapTableSink.getPartitionIds().isEmpty() ? null :
olapTableSink.getPartitionIds(),
- olapTableSink.isSingleReplicaLoad()
- );
- if (olapTableSink.isPartialUpdate() ||
(olapTableSink.isFromNativeInsertStmt()
- &&
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate())) {
- OlapTable olapTable = olapTableSink.getTargetTable();
+ HashSet<String> partialUpdateCols = new HashSet<>();
+ boolean isPartialUpdate = olapTableSink.isPartialUpdate();
+ if (isPartialUpdate) {
+ OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable();
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
}
- HashSet<String> partialUpdateCols = new HashSet<>();
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : olapTableSink.getCols()) {
@@ -404,8 +389,26 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
&&
partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
- sink.setPartialUpdateInputColumns(true, partialUpdateCols);
}
+ TupleDescriptor olapTuple = context.generateTupleDesc();
+ List<Column> targetTableColumns =
olapTableSink.getTargetTable().getFullSchema();
+ for (Column column : targetTableColumns) {
+ if (isPartialUpdate &&
!partialUpdateCols.contains(column.getName())) {
+ continue;
+ }
+ SlotDescriptor slotDesc = context.addSlotDesc(olapTuple);
+ slotDesc.setIsMaterialized(true);
+ slotDesc.setType(column.getType());
+ slotDesc.setColumn(column);
+ slotDesc.setIsNullable(column.isAllowNull());
+ }
+ OlapTableSink sink = new OlapTableSink(
+ olapTableSink.getTargetTable(),
+ olapTuple,
+ olapTableSink.getPartitionIds().isEmpty() ? null :
olapTableSink.getPartitionIds(),
+ olapTableSink.isSingleReplicaLoad()
+ );
+ sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
rootFragment.setSink(sink);
return rootFragment;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 47d542b736..a33647ff6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -325,7 +325,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
colNames,
ImmutableList.of(),
partitions,
- false,
+
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
true,
visitQuery(ctx.query()));
if (ctx.explain() != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index c4a9ac6bd3..479490cb29 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -135,7 +135,14 @@ public class BindSink implements AnalysisRuleFactory {
.filter(col ->
col.getName().equals(table.getSequenceMapCol()))
.findFirst().get();
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName()));
+ } else if (sink.isPartialUpdate()) {
+ // If the current load is a
partial update, the values of unmentioned
+ // columns will be filled in
SegmentWriter. And the output of sink node
+ // should not contain these
unmentioned columns, so we just skip them.
+ continue;
} else if (column.getDefaultValue() ==
null) {
+ // Otherwise, the unmentioned
columns should be filled with default values
+ // or null values
columnToOutput.put(column.getName(), new Alias(
new
NullLiteral(DataType.fromCatalogType(column.getType())),
column.getName()));
@@ -162,8 +169,17 @@ public class BindSink implements AnalysisRuleFactory {
// add cast project
List<NamedExpression> castExprs =
Lists.newArrayList();
for (int i = 0; i < table.getFullSchema().size();
++i) {
- Expression castExpr =
TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i),
-
DataType.fromCatalogType(table.getFullSchema().get(i).getType()));
+ Column col = table.getFullSchema().get(i);
+ NamedExpression expr = (NamedExpression)
columnToOutput.get(col.getName());
+ if (expr == null) {
+ // If `expr` is null, it means that the
current load is a partial update
+ // and `col` should not be contained in
the output of the sink node so
+ // we skip it.
+ continue;
+ }
+ Expression castExpr =
TypeCoercionUtils.castIfNotSameType(
+ expr,
+
DataType.fromCatalogType(col.getType()));
if (castExpr instanceof NamedExpression) {
castExprs.add(((NamedExpression)
castExpr));
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index f35b7c9dce..c7895de4ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -118,11 +118,15 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), label, planner);
isTxnBegin = true;
-
+ boolean isStrictMode =
(ctx.getSessionVariable().getEnableInsertStrict()
+ && physicalOlapTableSink.isPartialUpdate()
+ && physicalOlapTableSink.isFromNativeInsertStmt());
sink.init(ctx.queryId(), txn.getTxnId(),
physicalOlapTableSink.getDatabase().getId(),
ctx.getExecTimeout(),
- ctx.getSessionVariable().getSendBatchParallelism(), false,
false);
+ ctx.getSessionVariable().getSendBatchParallelism(),
+ false,
+ isStrictMode);
sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(
diff --git
a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
index 472acfc0c5..6068bd093b 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy
@@ -22,7 +22,6 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
sql "set enable_fallback_to_original_planner=false;"
sql "sync;"
- // sql 'set enable_fallback_to_original_planner=false'
def tableName = "nereids_partial_update_native_insert_stmt"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -100,6 +99,7 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
sql """insert into ${tableName3} values(2, "doris2", 2000, 223, 1),(1,
"doris", 1000, 123, 1);"""
qt_3 """ select * from ${tableName3} order by id; """
sql "set enable_unique_key_partial_update=true;"
+ sql "set enable_insert_strict = false;"
sql "sync;"
// in partial update, the unmentioned columns should have default values
or be nullable
// but field `name` is not nullable and doesn't have default value
@@ -221,7 +221,8 @@ suite("nereids_partial_update_native_insert_stmt", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName7}; """
sql "set enable_unique_key_partial_update=false;"
- sql "set enable_insert_strict = false;"
+ sql "set enable_insert_strict = true;"
sql "set enable_fallback_to_original_planner=true;"
+ sql "set enable_nereids_dml=false;"
sql "sync;"
}
diff --git
a/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
b/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
index 66945fed05..b1d8be2bff 100644
---
a/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
+++
b/regression-test/suites/nereids_p0/insert_into_table/partial_update_complex.groovy
@@ -111,7 +111,8 @@ suite("nereids_partial_update_native_insert_stmt_complex",
"p0") {
sql "DROP TABLE IF EXISTS ${tbName3}"
sql "set enable_unique_key_partial_update=false;"
- sql "set enable_insert_strict = false;"
+ sql "set enable_insert_strict = true;"
sql "set enable_fallback_to_original_planner=true;"
+ sql "set enable_nereids_dml=false;"
sql "sync;"
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
index 23a44c846e..a5c0360b9d 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy
@@ -100,6 +100,7 @@ suite("test_partial_update_native_insert_stmt", "p0") {
sql """insert into ${tableName3} values(2, "doris2", 2000, 223, 1),(1,
"doris", 1000, 123, 1);"""
qt_3 """ select * from ${tableName3} order by id; """
sql "set enable_unique_key_partial_update=true;"
+ sql "set enable_insert_strict = false;"
sql "sync;"
// in partial update, the unmentioned columns should have default values
or be nullable
// but field `name` is not nullable and doesn't have default value
@@ -221,7 +222,7 @@ suite("test_partial_update_native_insert_stmt", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName7}; """
sql "set enable_unique_key_partial_update=false;"
- sql "set enable_insert_strict = false;"
+ sql "set enable_insert_strict = true;"
sql "set experimental_enable_nereids_planner=true;"
sql "sync;"
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
index bdb3a12dc5..8070db512d 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt_complex.groovy
@@ -111,7 +111,7 @@ suite("test_partial_update_native_insert_stmt_complex",
"p0") {
sql "DROP TABLE IF EXISTS ${tbName3}"
sql "set enable_unique_key_partial_update=false;"
- sql "set enable_insert_strict = false;"
+ sql "set enable_insert_strict = true;"
sql "set experimental_enable_nereids_planner=true;"
sql "sync;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]