This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b98d225183f42cfe1dd19f9ed1e88d1e2f817c6b
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Tue Mar 26 17:05:46 2024 +0800

    [fix](insert)fix hive table sink type coercion and unify coercion (#32762)
    
    Issue Number: #31442
---
 .../doris/nereids/rules/analysis/BindSink.java     | 124 ++++++++++++---------
 .../trees/plans/logical/LogicalHiveTableSink.java  |   8 +-
 .../trees/plans/logical/LogicalOlapTableSink.java  |   8 +-
 .../trees/plans/logical/LogicalTableSink.java      |  17 ++-
 4 files changed, 85 insertions(+), 72 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index cac10b75ec3..20f05729822 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -56,6 +56,8 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
+import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
 import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.types.StringType;
 import org.apache.doris.nereids.types.coercion.CharacterType;
@@ -186,6 +188,67 @@ public class BindSink implements AnalysisRuleFactory {
             throw new AnalysisException(e.getMessage(), e.getCause());
         }
 
+        Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, 
table, isPartialUpdate,
+                boundSink, child);
+        LogicalProject<?> fullOutputProject = 
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
+        return boundSink.withChildAndUpdateOutput(fullOutputProject);
+    }
+
+    private LogicalProject<?> getOutputProjectByCoercion(List<Column> 
tableSchema, LogicalPlan child,
+                                                         Map<String, 
NamedExpression> columnToOutput) {
+        List<NamedExpression> fullOutputExprs = 
ImmutableList.copyOf(columnToOutput.values());
+        if (child instanceof LogicalOneRowRelation) {
+            // remove default value slot in one row relation
+            child = ((LogicalOneRowRelation) 
child).withProjects(((LogicalOneRowRelation) child)
+                    .getProjects().stream()
+                    .filter(p -> !(p instanceof DefaultValueSlot))
+                    .collect(ImmutableList.toImmutableList()));
+        }
+        LogicalProject<?> fullOutputProject = new 
LogicalProject<>(fullOutputExprs, child);
+
+        // add cast project
+        List<NamedExpression> castExprs = Lists.newArrayList();
+        for (int i = 0; i < tableSchema.size(); ++i) {
+            Column col = tableSchema.get(i);
+            NamedExpression expr = columnToOutput.get(col.getName());
+            if (expr == null) {
+                // If `expr` is null, it means that the current load is a 
partial update
+                // and `col` should not be contained in the output of the sink 
node so
+                // we skip it.
+                continue;
+            }
+            DataType inputType = expr.getDataType();
+            DataType targetType = 
DataType.fromCatalogType(tableSchema.get(i).getType());
+            Expression castExpr = expr;
+            // TODO move string like type logic into 
TypeCoercionUtils#castIfNotSameType
+            if (isSourceAndTargetStringLikeType(inputType, targetType) && 
!inputType.equals(targetType)) {
+                int sourceLength = ((CharacterType) inputType).getLen();
+                int targetLength = ((CharacterType) targetType).getLen();
+                if (sourceLength == targetLength) {
+                    castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, 
targetType);
+                } else if (sourceLength > targetLength && targetLength >= 0) {
+                    castExpr = new Substring(castExpr, Literal.of(1), 
Literal.of(targetLength));
+                } else if (targetType.isStringType()) {
+                    castExpr = new Cast(castExpr, StringType.INSTANCE);
+                }
+            } else {
+                castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, 
targetType);
+            }
+            if (castExpr instanceof NamedExpression) {
+                castExprs.add(((NamedExpression) castExpr));
+            } else {
+                castExprs.add(new Alias(castExpr));
+            }
+        }
+        if (!castExprs.equals(fullOutputExprs)) {
+            fullOutputProject = new LogicalProject<Plan>(castExprs, 
fullOutputProject);
+        }
+        return fullOutputProject;
+    }
+
+    private static Map<String, NamedExpression> getColumnToOutput(
+            MatchingContext<? extends UnboundLogicalSink<Plan>> ctx,
+            TableIf table, boolean isPartialUpdate, LogicalTableSink<?> 
boundSink, LogicalPlan child) {
         // we need to insert all the columns of the target table
         // although some columns are not mentions.
         // so we add a projects to supply the default value.
@@ -225,11 +288,11 @@ public class BindSink implements AnalysisRuleFactory {
                     && !(columnToChildOutput.get(column) instanceof 
DefaultValueSlot)) {
                 columnToOutput.put(column.getName(), 
columnToChildOutput.get(column));
             } else {
-                if (table.hasSequenceCol()
+                if (table instanceof OlapTable && ((OlapTable) 
table).hasSequenceCol()
                         && column.getName().equals(Column.SEQUENCE_COL)
-                        && table.getSequenceMapCol() != null) {
+                        && ((OlapTable) table).getSequenceMapCol() != null) {
                     Optional<Column> seqCol = table.getFullSchema().stream()
-                            .filter(col -> 
col.getName().equals(table.getSequenceMapCol()))
+                            .filter(col -> col.getName().equals(((OlapTable) 
table).getSequenceMapCol()))
                             .findFirst();
                     if (!seqCol.isPresent()) {
                         throw new AnalysisException("sequence column is not 
contained in"
@@ -303,55 +366,7 @@ public class BindSink implements AnalysisRuleFactory {
                 }
             }
         }
-        List<NamedExpression> fullOutputExprs = 
ImmutableList.copyOf(columnToOutput.values());
-        if (child instanceof LogicalOneRowRelation) {
-            // remove default value slot in one row relation
-            child = ((LogicalOneRowRelation) 
child).withProjects(((LogicalOneRowRelation) child)
-                    .getProjects().stream()
-                    .filter(p -> !(p instanceof DefaultValueSlot))
-                    .collect(ImmutableList.toImmutableList()));
-        }
-        LogicalProject<?> fullOutputProject = new 
LogicalProject<>(fullOutputExprs, child);
-
-        // add cast project
-        List<NamedExpression> castExprs = Lists.newArrayList();
-        for (int i = 0; i < table.getFullSchema().size(); ++i) {
-            Column col = table.getFullSchema().get(i);
-            NamedExpression expr = columnToOutput.get(col.getName());
-            if (expr == null) {
-                // If `expr` is null, it means that the current load is a 
partial update
-                // and `col` should not be contained in the output of the sink 
node so
-                // we skip it.
-                continue;
-            }
-            DataType inputType = expr.getDataType();
-            DataType targetType = 
DataType.fromCatalogType(table.getFullSchema().get(i).getType());
-            Expression castExpr = expr;
-            // TODO move string like type logic into 
TypeCoercionUtils#castIfNotSameType
-            if (isSourceAndTargetStringLikeType(inputType, targetType) && 
!inputType.equals(targetType)) {
-                int sourceLength = ((CharacterType) inputType).getLen();
-                int targetLength = ((CharacterType) targetType).getLen();
-                if (sourceLength == targetLength) {
-                    castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, 
targetType);
-                } else if (sourceLength > targetLength && targetLength >= 0) {
-                    castExpr = new Substring(castExpr, Literal.of(1), 
Literal.of(targetLength));
-                } else if (targetType.isStringType()) {
-                    castExpr = new Cast(castExpr, StringType.INSTANCE);
-                }
-            } else {
-                castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, 
targetType);
-            }
-            if (castExpr instanceof NamedExpression) {
-                castExprs.add(((NamedExpression) castExpr));
-            } else {
-                castExprs.add(new Alias(castExpr));
-            }
-        }
-        if (!castExprs.equals(fullOutputExprs)) {
-            fullOutputProject = new LogicalProject<Plan>(castExprs, 
fullOutputProject);
-        }
-
-        return boundSink.withChildAndUpdateOutput(fullOutputProject);
+        return columnToOutput;
     }
 
     private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> 
ctx) {
@@ -394,7 +409,10 @@ public class BindSink implements AnalysisRuleFactory {
         if (boundSink.getCols().size() != child.getOutput().size()) {
             throw new AnalysisException("insert into cols should be 
corresponding to the query output");
         }
-        return boundSink;
+        Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, 
table, false,
+                boundSink, child);
+        LogicalProject<?> fullOutputProject = 
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
+        return boundSink.withChildAndUpdateOutput(fullOutputProject);
     }
 
     private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, 
UnboundTableSink<? extends Plan> sink) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
index 7ae217f3fb4..360d227b0f0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
@@ -47,7 +47,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
     // bound data sink
     private final HMSExternalDatabase database;
     private final HMSExternalTable targetTable;
-    private final List<Column> cols;
     private final Set<String> hivePartitionKeys;
     private final DMLCommandType dmlCommandType;
 
@@ -63,10 +62,9 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
                                 Optional<GroupExpression> groupExpression,
                                 Optional<LogicalProperties> logicalProperties,
                                 CHILD_TYPE child) {
-        super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
+        super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, cols, child);
         this.database = Objects.requireNonNull(database, "database != null in 
LogicalHiveTableSink");
         this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in LogicalHiveTableSink");
-        this.cols = Utils.copyRequiredList(cols);
         this.dmlCommandType = dmlCommandType;
         this.hivePartitionKeys = hivePartitionKeys;
     }
@@ -99,10 +97,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
         return targetTable;
     }
 
-    public List<Column> getCols() {
-        return cols;
-    }
-
     public Set<String> getHivePartitionKeys() {
         return hivePartitionKeys;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
index c5e3336dabb..397c2927d84 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java
@@ -46,7 +46,6 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
     // bound data sink
     private final Database database;
     private final OlapTable targetTable;
-    private final List<Column> cols;
     private final List<Long> partitionIds;
     private final boolean isPartialUpdate;
     private final DMLCommandType dmlCommandType;
@@ -65,10 +64,9 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
             List<Long> partitionIds, List<NamedExpression> outputExprs, 
boolean isPartialUpdate,
             DMLCommandType dmlCommandType, Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
+        super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, cols, child);
         this.database = Objects.requireNonNull(database, "database != null in 
LogicalOlapTableSink");
         this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in LogicalOlapTableSink");
-        this.cols = Utils.copyRequiredList(cols);
         this.isPartialUpdate = isPartialUpdate;
         this.dmlCommandType = dmlCommandType;
         this.partitionIds = Utils.copyRequiredList(partitionIds);
@@ -97,10 +95,6 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> 
extends LogicalTableS
         return targetTable;
     }
 
-    public List<Column> getCols() {
-        return cols;
-    }
-
     public List<Long> getPartitionIds() {
         return partitionIds;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java
index 90133274f6e..7aca0c599b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java
@@ -17,11 +17,14 @@
 
 package org.apache.doris.nereids.trees.plans.logical;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.util.Utils;
 
 import java.util.List;
 import java.util.Optional;
@@ -30,14 +33,18 @@ import java.util.Optional;
  * Logical table sink for all table type sink
  */
 public abstract class LogicalTableSink<CHILD_TYPE extends Plan> extends 
LogicalSink<CHILD_TYPE> {
-    public LogicalTableSink(PlanType type,
-            List<NamedExpression> outputExprs, CHILD_TYPE child) {
-        super(type, outputExprs, child);
-    }
+    protected final List<Column> cols;
 
     public LogicalTableSink(PlanType type, List<NamedExpression> outputExprs,
             Optional<GroupExpression> groupExpression,
-            Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
+            Optional<LogicalProperties> logicalProperties, List<Column> cols, 
CHILD_TYPE child) {
         super(type, outputExprs, groupExpression, logicalProperties, child);
+        this.cols = Utils.copyRequiredList(cols);
+    }
+
+    public abstract TableIf getTargetTable();
+
+    public List<Column> getCols() {
+        return cols;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to