This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ce71c3497f4 [fix](load) Fix src slot mapping issue when file columns
are not explicitly specified (#56041)
ce71c3497f4 is described below
commit ce71c3497f49ca074ffbaf4988835c9215ee6b35
Author: Xin Liao <[email protected]>
AuthorDate: Tue Sep 16 12:38:43 2025 +0800
[fix](load) Fix src slot mapping issue when file columns are not explicitly
specified (#56041)
When users specify only SET expressions (e.g., 'kd01=20230102') without
explicit file column mapping, the system incorrectly auto-generates file
columns that include columns meant for SET expressions only.
Root cause:
- when specifyFileFieldNames=false, the system auto-generates file
columns for all table columns
- However, columns with SET expressions (stored in userMappingColumns)
should be skipped during auto-generation since they don't exist in the
source file
- The missing skip logic caused these SET-only columns to be treated as
file columns
Issues caused:
1. BE reports 'failed to find default value expr for slot' when SET
columns are
missing from source file
2. Incorrect slot mapping leading to data quality errors
---
.../nereids/load/NereidsLoadScanProvider.java | 17 +-
.../load_p0/broker_load/test_s3_load_with_set.out | Bin 0 -> 224 bytes
.../data/load_p0/stream_load/test_bitmap.csv | 1 +
.../stream_load/test_stream_load_bitmap.out | Bin 0 -> 115 bytes
.../stream_load/test_stream_load_with_set.out | Bin 0 -> 113 bytes
.../broker_load/test_s3_load_with_set.groovy | 218 +++++++++++++++++++++
.../stream_load/test_stream_load_bitmap.groovy | 54 +++++
.../stream_load/test_stream_load_with_set.groovy | 96 +++++++++
8 files changed, 385 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
index 70f44da7d83..32d8262a0f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
@@ -62,6 +62,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -164,11 +165,20 @@ public class NereidsLoadScanProvider {
// (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2)
// so "tmpk3 = k1 + k2" is not needed anymore, we can skip it.
List<NereidsImportColumnDesc> copiedColumnExprs = new
ArrayList<>(columnDescs.size());
+ Set<String> constantMappingColumns = new HashSet<>();
for (NereidsImportColumnDesc importColumnDesc : columnDescs) {
String mappingColumnName = importColumnDesc.getColumnName();
- if (importColumnDesc.isColumn() ||
tbl.getColumn(mappingColumnName) != null) {
+ if (importColumnDesc.isColumn()) {
copiedColumnExprs.add(importColumnDesc);
+ } else if (tbl.getColumn(mappingColumnName) != null) {
+ copiedColumnExprs.add(importColumnDesc);
+ // Only track columns with constant expressions (e.g., "k1 =
'constant'")
+ // Non-constant expressions (e.g., "k1 = k1 + 1") still need
to read from file
+ if (importColumnDesc.getExpr().isConstant()) {
+ constantMappingColumns.add(mappingColumnName);
+ }
}
+ // Skip mapping columns that don't exist in table schema
}
// check whether the OlapTable has sequenceCol and skipBitmapCol
@@ -188,6 +198,11 @@ public class NereidsLoadScanProvider {
if (!specifyFileFieldNames) {
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
+ if (constantMappingColumns.contains(column.getName())) {
+ // Skip this column because user has already specified a
constant mapping expression for it
+ // in the COLUMNS parameter (e.g., "column_name =
'constant_value'")
+ continue;
+ }
NereidsImportColumnDesc columnDesc;
if (fileGroup.getFileFormatProperties().getFileFormatType() ==
TFileFormatType.FORMAT_JSON) {
columnDesc = new NereidsImportColumnDesc(column.getName());
diff --git a/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out
b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out
new file mode 100644
index 00000000000..61ae94b779e
Binary files /dev/null and
b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out differ
diff --git a/regression-test/data/load_p0/stream_load/test_bitmap.csv
b/regression-test/data/load_p0/stream_load/test_bitmap.csv
new file mode 100644
index 00000000000..671731ed874
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_bitmap.csv
@@ -0,0 +1 @@
+b,2,AA==
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out
b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out
new file mode 100644
index 00000000000..5e7cb4ad9f6
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out differ
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out
b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out
new file mode 100644
index 00000000000..70ffe11e1ed
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out differ
diff --git
a/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy
new file mode 100644
index 00000000000..1987f548bb3
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_s3_load_with_set", "load_p0") {
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+
+ def table = "s3_load_with_set"
+
+ sql """ DROP TABLE IF EXISTS ${table} """
+
+ sql """
+ CREATE TABLE ${table}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 DATE NOT NULL
+ )
+ DUPLICATE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ def attributesList = [
+
+ ]
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+ "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY
\"|\"", "FORMAT AS \"CSV\"",
"(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+ "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY
\"|\"", "FORMAT AS \"CSV\"", "",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq",
+ "${table}", "", "", "FORMAT AS \"parquet\"",
"(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq",
+ "${table}", "", "", "FORMAT AS \"parquet\"", "",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc",
+ "${table}", "", "", "FORMAT AS \"orc\"",
"(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ attributesList.add(new
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc",
+ "${table}", "", "", "FORMAT AS \"orc\"", "",
+ "set(kd01=20240123)", "", "", "", ""))
+
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+ def i = 0
+ for (LoadAttributes attributes : attributesList) {
+ def label = "test_s3_load_" +
UUID.randomUUID().toString().replace("-", "_") + "_" + i
+ attributes.label = label
+ def prop = attributes.getPropertiesStr()
+
+ def sql_str = """
+ LOAD LABEL $label (
+ $attributes.dataDesc.mergeType
+ DATA INFILE("$attributes.dataDesc.path")
+ INTO TABLE $attributes.dataDesc.tableName
+ $attributes.dataDesc.columnTermClause
+ $attributes.dataDesc.lineTermClause
+ $attributes.dataDesc.formatClause
+ $attributes.dataDesc.columns
+ $attributes.dataDesc.columnsFromPathClause
+ $attributes.dataDesc.columnMappingClause
+ $attributes.dataDesc.precedingFilterClause
+ $attributes.dataDesc.orderByClause
+ $attributes.dataDesc.whereExpr
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "${s3Endpoint}",
+ "AWS_REGION" = "${s3Region}",
+ "use_path_style" = "$attributes.usePathStyle"
+ )
+ ${prop}
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+ logger.info("Submit load with lable: $label, table:
$attributes.dataDesc.tableName, path: $attributes.dataDesc.path")
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where
label="$attributes.label" order by createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ if (attributes.isExceptFailed) {
+ assertTrue(false, "load should be failed but was success:
$result")
+ }
+ logger.info("Load FINISHED " + attributes.label + ": $result")
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ if (attributes.isExceptFailed) {
+ logger.info("Load FINISHED " + attributes.label)
+ break
+ }
+ assertTrue(false, "load failed: $result")
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if (max_try_milli_secs <= 0) {
+ assertTrue(false, "load Timeout: $attributes.label")
+ }
+ }
+ qt_select """ select count(*) from $attributes.dataDesc.tableName """
+ ++i
+ }
+
+ qt_select """ select count(*) from ${table} """
+}
+
+class DataDesc {
+ public String mergeType = ""
+ public String path
+ public String tableName
+ public String lineTermClause
+ public String columnTermClause
+ public String formatClause
+ public String columns
+ public String columnsFromPathClause
+ public String precedingFilterClause
+ public String columnMappingClause
+ public String whereExpr
+ public String orderByClause
+}
+
+class LoadAttributes {
+ LoadAttributes(String path, String tableName, String lineTermClause,
String columnTermClause, String formatClause,
+ String columns, String columnsFromPathClause, String
precedingFilterClause, String columnMappingClause, String whereExpr, String
orderByClause, boolean isExceptFailed = false) {
+ this.dataDesc = new DataDesc()
+ this.dataDesc.path = path
+ this.dataDesc.tableName = tableName
+ this.dataDesc.lineTermClause = lineTermClause
+ this.dataDesc.columnTermClause = columnTermClause
+ this.dataDesc.formatClause = formatClause
+ this.dataDesc.columns = columns
+ this.dataDesc.columnsFromPathClause = columnsFromPathClause
+ this.dataDesc.precedingFilterClause = precedingFilterClause
+ this.dataDesc.columnMappingClause = columnMappingClause
+ this.dataDesc.whereExpr = whereExpr
+ this.dataDesc.orderByClause = orderByClause
+
+ this.isExceptFailed = isExceptFailed
+
+ properties = new HashMap<>()
+ }
+
+ LoadAttributes addProperties(String k, String v) {
+ properties.put(k, v)
+ return this
+ }
+
+ String getPropertiesStr() {
+ if (properties.isEmpty()) {
+ return ""
+ }
+ String prop = "PROPERTIES ("
+ properties.forEach (k, v) -> {
+ prop += "\"${k}\" = \"${v}\","
+ }
+ prop = prop.substring(0, prop.size() - 1)
+ prop += ")"
+ return prop
+ }
+
+ LoadAttributes withPathStyle() {
+ usePathStyle = "true"
+ return this
+ }
+
+ public DataDesc dataDesc
+ public Map<String, String> properties
+ public String label
+ public String usePathStyle = "false"
+ public boolean isExceptFailed
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy
new file mode 100644
index 00000000000..0c22acc14c2
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_bitmap", "p0") {
+ def tableName = "test_stream_load_bitmap"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `cache_key` varchar(20) NOT NULL,
+ `result_cnt` int NULL,
+ `result` bitmap NOT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`cache_key`)
+ DISTRIBUTED BY HASH(`cache_key`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // test strict_mode success
+ streamLoad {
+ table "${tableName}"
+
+ file 'test_bitmap.csv'
+ set "column_separator", ","
+ set "columns",
"cache_key,result_cnt,result,result=bitmap_from_base64(result)"
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(1, json.NumberTotalRows)
+ }
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql2 "select * from ${tableName}"
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy
new file mode 100644
index 00000000000..0783d65c27d
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_with_set", "load_p0") {
+ def tableName = "stream_load_with_set"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ kd01 DATE NOT NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+ )
+ DUPLICATE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'columns',
"k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,kd01=20240123"
+ file "basic_data.csv"
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'columns', "kd01=20240123"
+ file "basic_data.csv"
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ qt_select """ select count(*) from ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]