This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new dc882bcf4d6 branch-4.1: [fix](nereids) bind file column placeholders
for copy into select (#64591)
dc882bcf4d6 is described below
commit dc882bcf4d6d25455e3e4c343e3d9689c002a9b5
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 18 16:48:47 2026 +0800
branch-4.1: [fix](nereids) bind file column placeholders for copy into
select (#64591)
pick https://github.com/apache/doris/pull/64395
---
.../trees/plans/commands/info/CopyFromDesc.java | 33 +++---
.../trees/plans/commands/info/CopyIntoInfo.java | 115 +++++++++++++++++++--
.../suites/load_p0/copy_into/test_copy_into.groovy | 65 ++++++++++++
3 files changed, 190 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
index 4e283ead549..e0e975d35bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
@@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.CopyFromParam;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StageAndPattern;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
@@ -203,23 +202,33 @@ public class CopyFromDesc {
if (exprList == null) {
return false;
}
- List<SlotRef> slotRefs = Lists.newArrayList();
- // Expr.collectList(exprList, SlotRef.class, slotRefs);
+ boolean hasFileColumnPlaceholder = false;
Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
- for (SlotRef slotRef : slotRefs) {
- String columnName = slotRef.getColumnName();
- if (columnName.startsWith(DOLLAR)) {
- if (fileColumns.size() > 0) {
+ List<Expression> fileColumnExpressions = exprList.stream().map(expr ->
(Expression) expr)
+ .collect(Collectors.toList());
+ fileFilterExpr.ifPresent(fileColumnExpressions::add);
+ for (Expression expr : fileColumnExpressions) {
+ for (UnboundSlot slot :
expr.<UnboundSlot>collectToList(UnboundSlot.class::isInstance)) {
+ String columnName = slot.getName();
+ if (columnName.startsWith(DOLLAR)) {
+ if (!fileColumns.isEmpty()) {
+ throw new AnalysisException("can not mix column name
and dollar sign");
+ }
+ hasFileColumnPlaceholder = true;
+ continue;
+ }
+ if (hasFileColumnPlaceholder) {
throw new AnalysisException("can not mix column name and
dollar sign");
}
- return false;
- }
- if (columnSet.add(columnName)) {
- fileColumns.add(columnName);
+ if (columnSet.add(columnName)) {
+ fileColumns.add(columnName);
+ }
}
}
+ if (hasFileColumnPlaceholder) {
+ return false;
+ }
if (addDeleteSign) {
- // exprList.add(new SlotRef(null, Column.DELETE_SIGN));
fileColumns.add(Column.DELETE_SIGN);
}
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
index f0fc2630500..e1ab3d111a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
@@ -53,6 +53,7 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
@@ -62,6 +63,8 @@ import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -70,6 +73,8 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
@@ -239,8 +244,10 @@ public class CopyIntoInfo {
}
PlanTranslatorContext context = new
PlanTranslatorContext(cascadesContext);
List<Slot> slots = boundRelation.getOutput();
- Scope scope = new Scope(slots);
- ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope,
cascadesContext, false, false);
+ CopyIntoFileSlots fileSlots = new CopyIntoFileSlots(slots,
copyFromDesc.getFileColumns(),
+ copyFromDesc.getColumnMappingList());
+ ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, new
Scope(fileSlots.getScopeSlots()),
+ cascadesContext, false, false);
Map<SlotReference, SlotRef> translateMap = Maps.newHashMap();
@@ -257,13 +264,14 @@ public class CopyIntoInfo {
if (copyFromDesc.getColumnMappingList() != null &&
!copyFromDesc.getColumnMappingList().isEmpty()) {
legacyColumnMappingList = new ArrayList<>();
for (Expression expression : copyFromDesc.getColumnMappingList()) {
- legacyColumnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext));
+ legacyColumnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext,
+ fileSlots));
}
}
Expr legacyFileFilterExpr = null;
if (copyFromDesc.getFileFilterExpr().isPresent()) {
legacyFileFilterExpr =
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
- analyzer, context, cascadesContext);
+ analyzer, context, cascadesContext, fileSlots);
}
String compression = copyIntoProperties.getCompression();
@@ -301,30 +309,32 @@ public class CopyIntoInfo {
}
// translate copy from description to copy from param
- legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context,
cascadesContext);
+ legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context,
cascadesContext, fileSlots);
}
private CopyFromParam toLegacyParam(CopyFromDesc copyFromDesc,
ExpressionAnalyzer analyzer,
- PlanTranslatorContext context,
CascadesContext cascadesContext) {
+ PlanTranslatorContext context,
CascadesContext cascadesContext,
+ CopyIntoFileSlots fileSlots) {
StageAndPattern stageAndPattern = copyFromDesc.getStageAndPattern();
List<Expr> exprList = null;
if (copyFromDesc.getExprList() != null) {
exprList = new ArrayList<>();
for (Expression expression : copyFromDesc.getExprList()) {
- exprList.add(translateToLegacyExpr(expression, analyzer,
context, cascadesContext));
+ exprList.add(translateToLegacyExpr(expression, analyzer,
context, cascadesContext, fileSlots));
}
}
Expr fileFilterExpr = null;
if (copyFromDesc.getFileFilterExpr().isPresent()) {
fileFilterExpr =
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
- analyzer, context, cascadesContext);
+ analyzer, context, cascadesContext, fileSlots);
}
List<String> fileColumns = copyFromDesc.getFileColumns();
List<Expr> columnMappingList = null;
if (copyFromDesc.getColumnMappingList() != null) {
columnMappingList = new ArrayList<>();
for (Expression expression : copyFromDesc.getColumnMappingList()) {
- columnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext));
+ columnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext,
+ fileSlots));
}
}
List<String> targetColumns = copyFromDesc.getTargetColumns();
@@ -333,7 +343,7 @@ public class CopyIntoInfo {
}
private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer
analyzer, PlanTranslatorContext context,
- CascadesContext cascadesContext) {
+ CascadesContext cascadesContext,
CopyIntoFileSlots fileSlots) {
Expression expression;
try {
expression = analyzer.analyze(expr, new
ExpressionRewriteContext(cascadesContext));
@@ -342,11 +352,26 @@ public class CopyIntoInfo {
+ expr.toSql() + "', "
+ Utils.convertFirstChar(e.getMessage()));
}
- ExpressionToExpr translator = new ExpressionToExpr();
+ ExpressionToExpr translator = new ExpressionToExpr(fileSlots);
return expression.accept(translator, context);
}
private static class ExpressionToExpr extends ExpressionTranslator {
+ private final CopyIntoFileSlots fileSlots;
+
+ private ExpressionToExpr(CopyIntoFileSlots fileSlots) {
+ this.fileSlots = fileSlots;
+ }
+
+ @Override
+ public Expr visitSlotReference(SlotReference slotReference,
PlanTranslatorContext context) {
+ String fileSlotName =
fileSlots.getFileSlotName(slotReference.getExprId());
+ if (fileSlotName != null) {
+ return new SlotRef(null, fileSlotName);
+ }
+ return super.visitSlotReference(slotReference, context);
+ }
+
@Override
public Expr visitCast(Cast cast, PlanTranslatorContext context) {
// left child of cast is target type, right child of cast is
expression
@@ -355,6 +380,74 @@ public class CopyIntoInfo {
}
}
+ private static class CopyIntoFileSlots {
+ private final List<Slot> scopeSlots;
+ private final Map<ExprId, String> fileSlotNames = Maps.newHashMap();
+
+ private CopyIntoFileSlots(List<Slot> targetSlots, List<String>
fileColumns,
+ List<Expression> columnMappingList) {
+ scopeSlots = new ArrayList<>(targetSlots);
+ if (fileColumns == null) {
+ return;
+ }
+ Map<String, DataType> targetColumnTypes =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ for (Slot slot : targetSlots) {
+ targetColumnTypes.put(slot.getName(), slot.getDataType());
+ }
+ Map<String, DataType> fileColumnTypes =
inferFileColumnTypes(targetColumnTypes, columnMappingList);
+ for (String fileColumn : fileColumns) {
+ if (!isFileColumnPlaceholder(fileColumn) ||
fileSlotNames.containsValue(fileColumn)) {
+ continue;
+ }
+ SlotReference slot = new SlotReference(fileColumn,
+ fileColumnTypes.getOrDefault(fileColumn,
StringType.INSTANCE), true);
+ scopeSlots.add(slot);
+ fileSlotNames.put(slot.getExprId(), fileColumn);
+ }
+ }
+
+ private List<Slot> getScopeSlots() {
+ return scopeSlots;
+ }
+
+ private String getFileSlotName(ExprId exprId) {
+ return fileSlotNames.get(exprId);
+ }
+
+ private static boolean isFileColumnPlaceholder(String columnName) {
+ return columnName != null && columnName.startsWith("$");
+ }
+
+ private static Map<String, DataType> inferFileColumnTypes(Map<String,
DataType> targetColumnTypes,
+ List<Expression> columnMappingList) {
+ Map<String, DataType> fileColumnTypes = Maps.newHashMap();
+ if (columnMappingList == null) {
+ return fileColumnTypes;
+ }
+ for (Expression expression : columnMappingList) {
+ if (!(expression instanceof EqualTo)) {
+ continue;
+ }
+ EqualTo columnMapping = (EqualTo) expression;
+ if (!(columnMapping.left() instanceof UnboundSlot)) {
+ continue;
+ }
+ DataType targetType = targetColumnTypes.get(((UnboundSlot)
columnMapping.left()).getName());
+ if (targetType == null) {
+ continue;
+ }
+ for (UnboundSlot fileColumn : columnMapping.right()
+ .<UnboundSlot>collect(UnboundSlot.class::isInstance)) {
+ String fileColumnName = fileColumn.getName();
+ if (isFileColumnPlaceholder(fileColumnName)) {
+ fileColumnTypes.putIfAbsent(fileColumnName,
targetType);
+ }
+ }
+ }
+ return fileColumnTypes;
+ }
+ }
+
// after validateStagePB, fileFormat and copyOption is not null
private void validateStagePB(StagePB stagePB) throws AnalysisException {
stageType = stagePB.getType();
diff --git a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
index 64e448731ca..6cfd078db7d 100644
--- a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
+++ b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
@@ -150,6 +150,71 @@ suite("test_copy_into", "p0") {
}
assertTrue(false, "should not come here")
}
+
+ def csvStageName = "test_copy_into_csv"
+ try_sql """drop stage if exists ${csvStageName}"""
+ sql """
+ create stage if not exists ${csvStageName}
+ properties ('endpoint' = '${getS3Endpoint()}' ,
+ 'region' = '${getS3Region()}' ,
+ 'bucket' = '${getS3BucketName()}' ,
+ 'prefix' = 'regression' ,
+ 'ak' = '${getS3AK()}' ,
+ 'sk' = '${getS3SK()}' ,
+ 'provider' = '${getS3Provider()}',
+ 'access_type' = 'aksk',
+ 'default.file.column_separator' = "|");
+ """
+
+ sql """ DROP TABLE IF EXISTS copy_into_select_placeholder; """
+ sql """
+ CREATE TABLE copy_into_select_placeholder (
+ p_partkey int NOT NULL DEFAULT "1",
+ p_name VARCHAR(55) NOT NULL DEFAULT "2",
+ p_mfgr VARCHAR(25) NOT NULL DEFAULT "3"
+ )ENGINE=OLAP
+ DUPLICATE KEY(`p_partkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+ """
+
+ result = sql """
+ copy into copy_into_select_placeholder
+ from (select \$1, \$2, \$3 from
@${csvStageName}('tpch/sf1/part.csv.split00.gz'))
+ properties ('file.type' = 'csv', 'file.column_separator' = '|',
+ 'file.compression' = 'gz', 'copy.async' = 'false');
+ """
+ logger.info("copy select placeholder result: " + result)
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][1].equals("FINISHED"),
+ "Finish copy into, state=" + result[0][1] + ", expected
state=FINISHED")
+ def selectPlaceholderCount = sql """ SELECT COUNT(*) FROM
copy_into_select_placeholder; """
+ assertTrue((selectPlaceholderCount[0][0] as long) > 0)
+
+ sql """ DROP TABLE IF EXISTS copy_into_filter_placeholder; """
+ sql """
+ CREATE TABLE copy_into_filter_placeholder (
+ p_partkey int NOT NULL DEFAULT "1"
+ )ENGINE=OLAP
+ DUPLICATE KEY(`p_partkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
+ """
+
+ result = sql """
+ copy into copy_into_filter_placeholder (p_partkey)
+ from (select 1 from
@${csvStageName}('tpch/sf1/part.csv.split00.gz') where \$1 is not null)
+ properties ('file.type' = 'csv', 'file.column_separator' = '|',
+ 'file.compression' = 'gz', 'copy.async' = 'false');
+ """
+ logger.info("copy filter placeholder result: " + result)
+ assertTrue(result.size() == 1)
+ assertTrue(result[0][1].equals("FINISHED"),
+ "Finish copy into, state=" + result[0][1] + ", expected
state=FINISHED")
+ def filterPlaceholderCount = sql """ SELECT COUNT(*) FROM
copy_into_filter_placeholder; """
+ assertTrue((filterPlaceholderCount[0][0] as long) > 0)
+
+ try_sql """drop stage if exists ${csvStageName}"""
try_sql """drop stage if exists ${externalStageName}"""
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]