This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b98d225183f42cfe1dd19f9ed1e88d1e2f817c6b Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Tue Mar 26 17:05:46 2024 +0800 [fix](insert)fix hive table sink type coercion and unify coercion (#32762) Issue Number: #31442 --- .../doris/nereids/rules/analysis/BindSink.java | 124 ++++++++++++--------- .../trees/plans/logical/LogicalHiveTableSink.java | 8 +- .../trees/plans/logical/LogicalOlapTableSink.java | 8 +- .../trees/plans/logical/LogicalTableSink.java | 17 ++- 4 files changed, 85 insertions(+), 72 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index cac10b75ec3..20f05729822 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -56,6 +56,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.types.StringType; import org.apache.doris.nereids.types.coercion.CharacterType; @@ -186,6 +188,67 @@ public class BindSink implements AnalysisRuleFactory { throw new AnalysisException(e.getMessage(), e.getCause()); } + Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, isPartialUpdate, + boundSink, child); + LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); + return boundSink.withChildAndUpdateOutput(fullOutputProject); + } + + private LogicalProject<?> getOutputProjectByCoercion(List<Column> tableSchema, LogicalPlan child, + Map<String, NamedExpression> columnToOutput) { + List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); + if (child instanceof LogicalOneRowRelation) { + // remove default value slot in one row relation + child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) + .getProjects().stream() + .filter(p -> !(p instanceof DefaultValueSlot)) + .collect(ImmutableList.toImmutableList())); + } + LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child); + + // add cast project + List<NamedExpression> castExprs = Lists.newArrayList(); + for (int i = 0; i < tableSchema.size(); ++i) { + Column col = tableSchema.get(i); + NamedExpression expr = columnToOutput.get(col.getName()); + if (expr == null) { + // If `expr` is null, it means that the current load is a partial update + // and `col` should not be contained in the output of the sink node so + // we skip it. + continue; + } + DataType inputType = expr.getDataType(); + DataType targetType = DataType.fromCatalogType(tableSchema.get(i).getType()); + Expression castExpr = expr; + // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType + if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { + int sourceLength = ((CharacterType) inputType).getLen(); + int targetLength = ((CharacterType) targetType).getLen(); + if (sourceLength == targetLength) { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } else if (sourceLength > targetLength && targetLength >= 0) { + castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); + } else if (targetType.isStringType()) { + castExpr = new Cast(castExpr, StringType.INSTANCE); + } + } else { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } + if (castExpr instanceof NamedExpression) { + castExprs.add(((NamedExpression) castExpr)); + } else { + castExprs.add(new Alias(castExpr)); + } + } + if (!castExprs.equals(fullOutputExprs)) { + fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject); + } + return fullOutputProject; + } + + private static Map<String, NamedExpression> getColumnToOutput( + MatchingContext<? extends UnboundLogicalSink<Plan>> ctx, + TableIf table, boolean isPartialUpdate, LogicalTableSink<?> boundSink, LogicalPlan child) { // we need to insert all the columns of the target table // although some columns are not mentions. // so we add a projects to supply the default value. @@ -225,11 +288,11 @@ public class BindSink implements AnalysisRuleFactory { && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) { columnToOutput.put(column.getName(), columnToChildOutput.get(column)); } else { - if (table.hasSequenceCol() + if (table instanceof OlapTable && ((OlapTable) table).hasSequenceCol() && column.getName().equals(Column.SEQUENCE_COL) - && table.getSequenceMapCol() != null) { + && ((OlapTable) table).getSequenceMapCol() != null) { Optional<Column> seqCol = table.getFullSchema().stream() - .filter(col -> col.getName().equals(table.getSequenceMapCol())) + .filter(col -> col.getName().equals(((OlapTable) table).getSequenceMapCol())) .findFirst(); if (!seqCol.isPresent()) { throw new AnalysisException("sequence column is not contained in" @@ -303,55 +366,7 @@ public class BindSink implements AnalysisRuleFactory { } } } - List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); - if (child instanceof LogicalOneRowRelation) { - // remove default value slot in one row relation - child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) - .getProjects().stream() - .filter(p -> !(p instanceof DefaultValueSlot)) - .collect(ImmutableList.toImmutableList())); - } - LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child); - - // add cast project - List<NamedExpression> castExprs = Lists.newArrayList(); - for (int i = 0; i < table.getFullSchema().size(); ++i) { - Column col = table.getFullSchema().get(i); - NamedExpression expr = columnToOutput.get(col.getName()); - if (expr == null) { - // If `expr` is null, it means that the current load is a partial update - // and `col` should not be contained in the output of the sink node so - // we skip it. - continue; - } - DataType inputType = expr.getDataType(); - DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType()); - Expression castExpr = expr; - // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType - if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { - int sourceLength = ((CharacterType) inputType).getLen(); - int targetLength = ((CharacterType) targetType).getLen(); - if (sourceLength == targetLength) { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); - } else if (sourceLength > targetLength && targetLength >= 0) { - castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); - } else if (targetType.isStringType()) { - castExpr = new Cast(castExpr, StringType.INSTANCE); - } - } else { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); - } - if (castExpr instanceof NamedExpression) { - castExprs.add(((NamedExpression) castExpr)); - } else { - castExprs.add(new Alias(castExpr)); - } - } - if (!castExprs.equals(fullOutputExprs)) { - fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject); - } - - return boundSink.withChildAndUpdateOutput(fullOutputProject); + return columnToOutput; } private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx) { @@ -394,7 +409,10 @@ public class BindSink implements AnalysisRuleFactory { if (boundSink.getCols().size() != child.getOutput().size()) { throw new AnalysisException("insert into cols should be corresponding to the query output"); } - return boundSink; + Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false, + boundSink, child); + LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); + return boundSink.withChildAndUpdateOutput(fullOutputProject); } private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java index 7ae217f3fb4..360d227b0f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java @@ -47,7 +47,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableS // bound data sink private final HMSExternalDatabase database; private final HMSExternalTable targetTable; - private final List<Column> cols; private final Set<String> hivePartitionKeys; private final DMLCommandType dmlCommandType; @@ -63,10 +62,9 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableS Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink"); - this.cols = Utils.copyRequiredList(cols); this.dmlCommandType = dmlCommandType; this.hivePartitionKeys = hivePartitionKeys; } @@ -99,10 +97,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableS return targetTable; } - public List<Column> getCols() { - return cols; - } - public Set<String> getHivePartitionKeys() { return hivePartitionKeys; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index c5e3336dabb..397c2927d84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -46,7 +46,6 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalTableS // bound data sink private final Database database; private final OlapTable targetTable; - private final List<Column> cols; private final List<Long> partitionIds; private final boolean isPartialUpdate; private final DMLCommandType dmlCommandType; @@ -65,10 +64,9 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalTableS List<Long> partitionIds, List<NamedExpression> outputExprs, boolean isPartialUpdate, DMLCommandType dmlCommandType, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); - this.cols = Utils.copyRequiredList(cols); this.isPartialUpdate = isPartialUpdate; this.dmlCommandType = dmlCommandType; this.partitionIds = Utils.copyRequiredList(partitionIds); @@ -97,10 +95,6 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalTableS return targetTable; } - public List<Column> getCols() { - return cols; - } - public List<Long> getPartitionIds() { return partitionIds; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java index 90133274f6e..7aca0c599b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTableSink.java @@ -17,11 +17,14 @@ package org.apache.doris.nereids.trees.plans.logical; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.util.Utils; import java.util.List; import java.util.Optional; @@ -30,14 +33,18 @@ import java.util.Optional; * Logical table sink for all table type sink */ public abstract class LogicalTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> { - public LogicalTableSink(PlanType type, - List<NamedExpression> outputExprs, CHILD_TYPE child) { - super(type, outputExprs, child); - } + protected final List<Column> cols; public LogicalTableSink(PlanType type, List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression, - Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { + Optional<LogicalProperties> logicalProperties, List<Column> cols, CHILD_TYPE child) { super(type, outputExprs, groupExpression, logicalProperties, child); + this.cols = Utils.copyRequiredList(cols); + } + + public abstract TableIf getTargetTable(); + + public List<Column> getCols() { + return cols; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org