This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 924314fe8bc branch-4.0: [fix](broker load)fix broker load fail when
<column from path> column already exists in file. #58579 (#58621)
924314fe8bc is described below
commit 924314fe8bc28dd30e54426a5f99d5d576d241c3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 21:06:10 2025 +0800
branch-4.0: [fix](broker load)fix broker load fail when <column from path>
column already exists in file. #58579 (#58621)
Cherry-picked from #58579
Co-authored-by: daidai <[email protected]>
---
be/src/vec/exec/scan/file_scanner.cpp | 16 +-
.../broker_load/test_load_data_from_path.groovy | 215 +++++++++++++++++++++
2 files changed, 230 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index bd70a2fdab7..d6df5cfbb71 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1397,7 +1397,21 @@ Status FileScanner::_set_fill_or_truncate_columns(bool
need_to_get_parsed_schema
std::unordered_map<std::string, DataTypePtr> name_to_col_type;
RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type,
&_missing_cols));
for (const auto& [col_name, col_type] : name_to_col_type) {
- _slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type);
+ auto col_name_lower = to_lower(col_name);
+ if (_partition_col_descs.contains(col_name_lower)) {
+ /*
+ * `_slot_lower_name_to_col_type` is used by `_init_src_block` and
`_cast_to_input_block` during LOAD to
+ * generate columns of the corresponding type, which records the
columns existing in the file.
+ *
+ * When a column in `COLUMNS FROM PATH` exists in a file column,
the column type in the block will
+ * not match the slot type in `_output_tuple_desc`, causing an
error when
+ * Serde `deserialize_one_cell_from_json` fills the partition
values.
+ *
+ * So for partition column not need fill
_slot_lower_name_to_col_type.
+ */
+ continue;
+ }
+ _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
}
if (!_fill_partition_from_path &&
config::enable_iceberg_partition_column_fallback) {
diff --git
a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
index e4c6a5d2eed..46fc29b2948 100644
--- a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
@@ -202,4 +202,219 @@ suite("test_load_columns_from_path", "load_p0") {
} finally {
sql """ DROP TABLE ${tableName} """
}
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ k1 INT,
+ k2 INT,
+ k3 INT
+ )
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ label = UUID.randomUUID().toString().replace("-", "2")
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.parquet")
+ INTO TABLE ${tableName}
+ FORMAT AS "parquet"
+ (k1, k2, k3)
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0]. State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+ logger.info("path data 1: " + pathData)
+ // 1 2 3
+ assertEquals(1, pathData[0][0])
+ assertEquals(2, pathData[0][1])
+ assertEquals(3, pathData[0][2])
+
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
+ label = UUID.randomUUID().toString().replace("-", "2")
+
+
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.orc")
+ INTO TABLE ${tableName}
+ FORMAT AS "orc"
+ (k1, k3)
+ COLUMNS FROM PATH AS (k2)
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0]. State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+ logger.info("path data 2: " + pathData)
+ // 1 20 3
+ assertEquals(1, pathData[0][0])
+ assertEquals(20, pathData[0][1])
+ assertEquals(3, pathData[0][2])
+
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
+ label = UUID.randomUUID().toString().replace("-", "2")
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.parquet")
+ INTO TABLE ${tableName}
+ FORMAT AS "parquet"
+ (k2)
+ COLUMNS FROM PATH AS (k1,k3)
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0]. State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+ logger.info("path data 2: " + pathData)
+ // 10 2 30
+ assertEquals(10, pathData[0][0])
+ assertEquals(2, pathData[0][1])
+ assertEquals(30, pathData[0][2])
+
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
+
+ label = UUID.randomUUID().toString().replace("-", "2")
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.orc")
+ INTO TABLE ${tableName}
+ FORMAT AS "orc"
+ (k1,k2)
+ COLUMNS FROM PATH AS (k3)
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0]. State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+ logger.info("path data 2: " + pathData)
+ assertEquals(1, pathData[0][0])
+ assertEquals(2, pathData[0][1])
+ assertEquals(30, pathData[0][2])
+
+
+ // [[1, 2, 30]]
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]