This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bebd3da649 [fix](broker load)fix broker load fail when  <column from 
path> column already exists in file. (#58579)
7bebd3da649 is described below

commit 7bebd3da649814bb11edd1dc7a2ac56a18ccc281
Author: daidai <[email protected]>
AuthorDate: Tue Dec 2 15:47:32 2025 +0800

    [fix](broker load)fix broker load fail when  <column from path> column 
already exists in file. (#58579)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    fix :
    ```
    LOAD LABEL labelx
    (
        DATA INFILE("s3://bucket/load/k1=10/k3=30/test.parquet")
        INTO TABLE tableName
        FORMAT AS "parquet"
        (k2)
        COLUMNS FROM PATH AS (k1,k3)
    )
    WITH S3
    ( xxx)
    ```
    
    and k1 or k3 column exists in test.parquet file.
    
    error:
    
    ```
    type:LOAD_RUN_FAIL; msg:errCode = 2, detailMessage = (127.0.0.1)[E-7412]
    assert cast err:[E-7412] Bad cast from 
type:doris::vectorized::ColumnVector<(doris::PrimitiveType)5> to 
doris::vectorized::ColumnStr<unsigned int>
    
            0#  doris::Exception::Exception(int, std::basic_string_view<char, 
std::char_traits<char> > const&, bool) at 
/mnt/disk1/changyuwei/ldb_toolchain/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/unique_ptr.h:193
            1#  doris::Exception::Exception(doris::Status const&) at 
/mnt/disk1/changyuwei/doris/be/src/common/exception.h:39
            2#  doris::vectorized::ColumnStr<unsigned int>& 
assert_cast<doris::vectorized::ColumnStr<unsigned int>&, (TypeCheckOnRelease)1, 
doris::vectorized::IColumn&>(doris::vectorized::IColumn&)::{lambda(auto:1&&)#1}::operator()<doris::vectorized::IColumn&>(doris::vectorized::IColumn&)
 const at /mnt/disk1/changyuwei/doris/be/src/vec/common/assert_cast.h:0
            3#  doris::vectorized::ColumnStr<unsigned int>& 
assert_cast<doris::vectorized::ColumnStr<unsigned int>&, (TypeCheckOnRelease)1, 
doris::vectorized::IColumn&>(doris::vectorized::IColumn&) at 
/mnt/disk1/changyuwei/doris/be/src/vec/common/assert_cast.h:72
            4#  
doris::vectorized::DataTypeStringSerDeBase<doris::vectorized::ColumnStr<unsigned
 int> >::deserialize_one_cell_from_json(doris::vectorized::IColumn&, 
doris::Slice&, doris::vectorized::DataTypeSerDe::FormatOptions const&) const at 
/mnt/disk1/changyuwei/doris/be/src/vec/data_types/serde/data_type_string_serde.cpp:108
            5#  
doris::vectorized::DataTypeNullableSerDe::deserialize_one_cell_from_json(doris::vectorized::IColumn&,
 doris::Slice&, doris::vectorized::DataTypeSerDe::FormatOptions const&) const 
at /mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            6#  
doris::vectorized::DataTypeNullableSerDe::deserialize_column_from_fixed_json(doris::vectorized::IColumn&,
 doris::Slice&, unsigned long, unsigned long*, 
doris::vectorized::DataTypeSerDe::FormatOptions const&) const at 
/mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            7#  
doris::vectorized::RowGroupReader::_fill_partition_columns(doris::vectorized::Block*,
 unsigned long, std::unordered_map<std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> >, 
std::tuple<std::__cxx11::basic_string<char, std::char_traits<char>, 
std::allocator<char> >, doris::SlotDescriptor const*>, 
std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, 
std::allocator<char> > >, std::equal_to<std::__cxx11::basic_string<char, 
std::char_tra [...]
            8#  
doris::vectorized::RowGroupReader::next_batch(doris::vectorized::Block*, 
unsigned long, unsigned long*, bool*) at 
/mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            9#  
doris::vectorized::ParquetReader::get_next_block(doris::vectorized::Block*, 
unsigned long*, bool*) at /mnt/disk1/changyuwei/doris/be/src/common/status.h:520
            10# 
doris::vectorized::FileScanner::_get_block_wrapped(doris::RuntimeState*, 
doris::vectorized::Block*, bool*) at 
/mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            11# 
doris::vectorized::FileScanner::_get_block_impl(doris::RuntimeState*, 
doris::vectorized::Block*, bool*) at 
/mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            12# doris::vectorized::Scanner::get_block(doris::RuntimeState*, 
doris::vectorized::Block*, bool*) at 
/mnt/disk1/changyuwei/doris/be/src/common/status.h:525
            13# 
doris::vectorized::Scanner::get_block_after_projects(doris::RuntimeState*, 
doris::vectorized::Block*, bool*) at 
/mnt/disk1/changyuwei/doris/be/src/vec/exec/scan/scanner.cpp:87
            14# 
doris::vectorized::ScannerScheduler::_scanner_scan(std::shared_ptr<doris::vectorized::ScannerContext>,
 std::shared_ptr<doris::vectorized::ScanTask>) at 
/mnt/disk1/changyuwei/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:182
    ```
---
 be/src/vec/exec/scan/file_scanner.cpp              |  16 +-
 .../broker_load/test_load_data_from_path.groovy    | 215 +++++++++++++++++++++
 2 files changed, 230 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index c7d10c89dc0..9373bcfd056 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1392,7 +1392,21 @@ Status FileScanner::_set_fill_or_truncate_columns(bool 
need_to_get_parsed_schema
     std::unordered_map<std::string, DataTypePtr> name_to_col_type;
     RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, 
&_missing_cols));
     for (const auto& [col_name, col_type] : name_to_col_type) {
-        _slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type);
+        auto col_name_lower = to_lower(col_name);
+        if (_partition_col_descs.contains(col_name_lower)) {
+            /*
+             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and 
`_cast_to_input_block` during LOAD to
+             * generate columns of the corresponding type, which records the 
columns existing in the file.
+             *
+             * When a column in `COLUMNS FROM PATH` exists in a file column, 
the column type in the block will
+             * not match the slot type in `_output_tuple_desc`, causing an 
error when
+             * Serde `deserialize_one_cell_from_json` fills the partition 
values.
+             *
+             * So for partition column not need fill 
_slot_lower_name_to_col_type.
+             */
+            continue;
+        }
+        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
     }
 
     if (!_fill_partition_from_path && 
config::enable_iceberg_partition_column_fallback) {
diff --git 
a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy 
b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
index e4c6a5d2eed..46fc29b2948 100644
--- a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
@@ -202,4 +202,219 @@ suite("test_load_columns_from_path", "load_p0") {
     } finally {
         sql """ DROP TABLE ${tableName} """
     }
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE ${tableName} (
+            k1 INT,
+            k2 INT,
+            k3 INT
+        )
+        DUPLICATE KEY(`k1`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        );
+        """
+
+    label = UUID.randomUUID().toString().replace("-", "2")
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA 
INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.parquet")
+                INTO TABLE ${tableName}
+                FORMAT AS "parquet"
+                (k1, k2, k3)
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0]. State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+        logger.info("path data 1: " + pathData)
+        // 1 2 3 
+        assertEquals(1, pathData[0][0])
+        assertEquals(2, pathData[0][1])
+        assertEquals(3, pathData[0][2])
+
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+
+    label = UUID.randomUUID().toString().replace("-", "2")
+
+
+        try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.orc")
+                INTO TABLE ${tableName}
+                FORMAT AS "orc"
+                (k1, k3)
+                COLUMNS FROM PATH AS (k2)
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0]. State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+        logger.info("path data 2: " + pathData)
+        // 1 20 3 
+        assertEquals(1, pathData[0][0])
+        assertEquals(20, pathData[0][1])
+        assertEquals(3, pathData[0][2])
+
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+
+    label = UUID.randomUUID().toString().replace("-", "2")
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA 
INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.parquet")
+                INTO TABLE ${tableName}
+                FORMAT AS "parquet"
+                (k2)
+                COLUMNS FROM PATH AS (k1,k3)
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0]. State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+        logger.info("path data 2: " + pathData)
+        // 10 2 30 
+        assertEquals(10, pathData[0][0])
+        assertEquals(2, pathData[0][1])
+        assertEquals(30, pathData[0][2])
+
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+
+
+    label = UUID.randomUUID().toString().replace("-", "2")
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.orc")
+                INTO TABLE ${tableName}
+                FORMAT AS "orc"
+                (k1,k2)
+                COLUMNS FROM PATH AS (k3)
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0]. State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
+        logger.info("path data 2: " + pathData)
+        assertEquals(1, pathData[0][0])
+        assertEquals(2, pathData[0][1])
+        assertEquals(30, pathData[0][2])
+
+
+        // [[1, 2, 30]]
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to