This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 028a1f79eb9 Backport 63102 branch 4.1 (#63740)
028a1f79eb9 is described below

commit 028a1f79eb985bd1f29617a0683dfc9da49bb004
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 14:42:09 2026 +0800

    Backport 63102 branch 4.1 (#63740)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    #63102
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/format/parquet/vparquet_group_reader.cpp | 124 ++++++++++++++----------
 be/src/format/parquet/vparquet_group_reader.h   |  11 ++-
 2 files changed, 78 insertions(+), 57 deletions(-)

diff --git a/be/src/format/parquet/vparquet_group_reader.cpp 
b/be/src/format/parquet/vparquet_group_reader.cpp
index 48375a764f4..d7f822f77d0 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -162,8 +162,9 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr 
file_reader,
           _filter_column_ids(filter_column_ids) {}
 
 RowGroupReader::~RowGroupReader() {
-    _column_readers.clear();
-    _obj_pool->clear();
+    if (_obj_pool != nullptr) {
+        _obj_pool->clear();
+    }
 }
 
 Status RowGroupReader::init(
@@ -493,31 +494,31 @@ Status RowGroupReader::_read_column_data(Block* block,
     size_t batch_read_rows = 0;
     bool has_eof = false;
     for (auto& read_col_name : table_columns) {
-        auto& column_with_type_and_name =
-                
block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
+        uint32_t block_pos = 0;
+        RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name, 
&block_pos));
+        auto reader_iter = _column_readers.find(read_col_name);
+        if (reader_iter == _column_readers.end() || reader_iter->second == 
nullptr) {
+            return Status::InternalError("Column reader for '{}' not found in 
parquet row group",
+                                         read_col_name);
+        }
+
+        auto& column_with_type_and_name = 
block->safe_get_by_position(block_pos);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         bool is_dict_filter = false;
         for (auto& _dict_filter_col : _dict_filter_cols) {
             if (_dict_filter_col.first == read_col_name) {
                 MutableColumnPtr dict_column = ColumnInt32::create();
-                if (!_col_name_to_block_idx->contains(read_col_name)) {
-                    return Status::InternalError(
-                            "Wrong read column '{}' in parquet file, block: 
{}", read_col_name,
-                            block->dump_structure());
-                }
                 if (column_type->is_nullable()) {
-                    
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
+                    block->get_by_position(block_pos).type =
                             
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
                     block->replace_by_position(
-                            (*_col_name_to_block_idx)[read_col_name],
+                            block_pos,
                             ColumnNullable::create(std::move(dict_column),
                                                    
ColumnUInt8::create(dict_column->size(), 0)));
                 } else {
-                    
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
-                            std::make_shared<DataTypeInt32>();
-                    
block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
-                                               std::move(dict_column));
+                    block->get_by_position(block_pos).type = 
std::make_shared<DataTypeInt32>();
+                    block->replace_by_position(block_pos, 
std::move(dict_column));
                 }
                 is_dict_filter = true;
                 break;
@@ -528,10 +529,10 @@ Status RowGroupReader::_read_column_data(Block* block,
         bool col_eof = false;
         // Should reset _filter_map_index to 0 when reading next column.
         //        select_vector.reset();
-        _column_readers[read_col_name]->reset_filter_map_index();
+        reader_iter->second->reset_filter_map_index();
         while (!col_eof && col_read_rows < batch_size) {
             size_t loop_rows = 0;
-            RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
+            RETURN_IF_ERROR(reader_iter->second->read_column_data(
                     column_ptr, column_type, 
_table_info_node_ptr->get_children_node(read_col_name),
                     filter_map, batch_size - col_read_rows, &loop_rows, 
&col_eof, is_dict_filter));
             VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
@@ -657,19 +658,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
                 SCOPED_RAW_TIMER(&_predicate_filter_time);
                 for (const auto& col : _lazy_read_ctx.predicate_columns.first) 
{
                     // clean block to read predicate columns
-                    block->get_by_position((*_col_name_to_block_idx)[col])
-                            .column->assume_mutable()
-                            ->clear();
+                    uint32_t block_pos = 0;
+                    RETURN_IF_ERROR(_get_block_column_pos(*block, col, 
&block_pos));
+                    
block->get_by_position(block_pos).column->assume_mutable()->clear();
                 }
                 for (const auto& col : 
_lazy_read_ctx.predicate_partition_columns) {
-                    
block->get_by_position((*_col_name_to_block_idx)[col.first])
-                            .column->assume_mutable()
-                            ->clear();
+                    uint32_t block_pos = 0;
+                    RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, 
&block_pos));
+                    
block->get_by_position(block_pos).column->assume_mutable()->clear();
                 }
                 for (const auto& col : 
_lazy_read_ctx.predicate_missing_columns) {
-                    
block->get_by_position((*_col_name_to_block_idx)[col.first])
-                            .column->assume_mutable()
-                            ->clear();
+                    uint32_t block_pos = 0;
+                    RETURN_IF_ERROR(_get_block_column_pos(*block, col.first, 
&block_pos));
+                    
block->get_by_position(block_pos).column->assume_mutable()->clear();
                 }
                 if (_row_id_column_iterator_pair.first != nullptr) {
                     block->get_by_position(_row_id_column_iterator_pair.second)
@@ -828,7 +829,9 @@ Status RowGroupReader::_fill_partition_columns(
                 partition_columns) {
     DataTypeSerDe::FormatOptions _text_formatOptions;
     for (const auto& kv : partition_columns) {
-        auto doris_column = 
block->get_by_position((*_col_name_to_block_idx)[kv.first]).column;
+        uint32_t block_pos = 0;
+        RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
+        auto doris_column = block->get_by_position(block_pos).column;
         // obtained from block*, it is a mutable object.
         auto* col_ptr = const_cast<IColumn*>(doris_column.get());
         const auto& [value, slot_desc] = kv.second;
@@ -857,14 +860,11 @@ Status RowGroupReader::_fill_missing_columns(
         Block* block, size_t rows,
         const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
     for (const auto& kv : missing_columns) {
-        if (!_col_name_to_block_idx->contains(kv.first)) {
-            return Status::InternalError("Missing column: {} not found in 
block {}", kv.first,
-                                         block->dump_structure());
-        }
+        uint32_t block_pos = 0;
+        RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
         if (kv.second == nullptr) {
             // no default column, fill with null
-            auto mutable_column = 
block->get_by_position((*_col_name_to_block_idx)[kv.first])
-                                          .column->assume_mutable();
+            auto mutable_column = 
block->get_by_position(block_pos).column->assume_mutable();
             auto* nullable_column = 
assert_cast<ColumnNullable*>(mutable_column.get());
             nullable_column->insert_many_defaults(rows);
         } else {
@@ -881,18 +881,39 @@ Status RowGroupReader::_fill_missing_columns(
                 mutable_column->resize(rows);
                 // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
                 result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
-                auto origin_column_type =
-                        
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
+                auto origin_column_type = 
block->get_by_position(block_pos).type;
                 bool is_nullable = origin_column_type->is_nullable();
-                block->replace_by_position(
-                        (*_col_name_to_block_idx)[kv.first],
-                        is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
+                block->replace_by_position(block_pos, is_nullable ? 
make_nullable(result_column_ptr)
+                                                                  : 
result_column_ptr);
             }
         }
     }
     return Status::OK();
 }
 
+Status RowGroupReader::_get_block_column_pos(const Block& block, const 
std::string& column_name,
+                                             uint32_t* position) const {
+    if (_col_name_to_block_idx == nullptr) {
+        return Status::InternalError(
+                "Column name to block index map is not set when reading 
parquet column '{}', "
+                "block: "
+                "{}",
+                column_name, block.dump_structure());
+    }
+    auto iter = _col_name_to_block_idx->find(column_name);
+    if (iter == _col_name_to_block_idx->end()) {
+        return Status::InternalError("Column '{}' not found in block index 
map, block: {}",
+                                     column_name, block.dump_structure());
+    }
+    if (iter->second >= block.columns()) {
+        return Status::InternalError(
+                "Column '{}' maps to invalid block position {}, block columns: 
{}, block: {}",
+                column_name, iter->second, block.columns(), 
block.dump_structure());
+    }
+    *position = iter->second;
+    return Status::OK();
+}
+
 Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* batch_eof,
                                          bool* modify_row_ids) {
     *modify_row_ids = false;
@@ -1336,13 +1357,14 @@ Status 
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
 
 Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
     for (auto& dict_filter_cols : _dict_filter_cols) {
-        if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
-            throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "Wrong read column '{}' in parquet file, block: 
{}",
-                            dict_filter_cols.first, block->dump_structure());
+        uint32_t block_pos = 0;
+        RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first, 
&block_pos));
+        auto reader_iter = _column_readers.find(dict_filter_cols.first);
+        if (reader_iter == _column_readers.end() || reader_iter->second == 
nullptr) {
+            return Status::InternalError("Column reader for '{}' not found in 
parquet row group",
+                                         dict_filter_cols.first);
         }
-        ColumnWithTypeAndName& column_with_type_and_name =
-                
block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
+        ColumnWithTypeAndName& column_with_type_and_name = 
block->get_by_position(block_pos);
         const ColumnPtr& column = column_with_type_and_name.column;
         if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*column)) {
             const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
@@ -1350,24 +1372,20 @@ Status 
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
             DCHECK(dict_column);
 
             auto string_column = DORIS_TRY(
-                    
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
-                            dict_column));
+                    
reader_iter->second->convert_dict_column_to_string_column(dict_column));
 
             column_with_type_and_name.type =
                     
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
             block->replace_by_position(
-                    (*_col_name_to_block_idx)[dict_filter_cols.first],
-                    ColumnNullable::create(std::move(string_column),
-                                           
nullable_column->get_null_map_column_ptr()));
+                    block_pos, ColumnNullable::create(std::move(string_column),
+                                                      
nullable_column->get_null_map_column_ptr()));
         } else {
             const auto* dict_column = assert_cast<const 
ColumnInt32*>(column.get());
             auto string_column = DORIS_TRY(
-                    
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
-                            dict_column));
+                    
reader_iter->second->convert_dict_column_to_string_column(dict_column));
 
             column_with_type_and_name.type = 
std::make_shared<DataTypeString>();
-            
block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
-                                       std::move(string_column));
+            block->replace_by_position(block_pos, std::move(string_column));
         }
     }
     return Status::OK();
diff --git a/be/src/format/parquet/vparquet_group_reader.h 
b/be/src/format/parquet/vparquet_group_reader.h
index 657876b36e6..208d3995b90 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -233,6 +233,8 @@ private:
     Status _fill_missing_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns);
+    Status _get_block_column_pos(const Block& block, const std::string& 
column_name,
+                                 uint32_t* position) const;
     Status _build_pos_delete_filter(size_t read_rows);
     Status _filter_block(Block* block, int column_to_keep,
                          const std::vector<uint32_t>& columns_to_filter);
@@ -250,9 +252,7 @@ private:
     Status _append_iceberg_rowid_column(Block* block, size_t read_rows, bool 
is_current_row_ids);
 
     io::FileReaderSPtr _file_reader;
-    std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
-            _column_readers; // table_column_name
-    const std::vector<std::string>& _read_table_columns;
+    std::vector<std::string> _read_table_columns;
 
     const int32_t _row_group_id;
     const tparquet::RowGroup& _row_group_meta;
@@ -263,8 +263,11 @@ private:
     std::shared_ptr<RowLineageColumns> _row_lineage_columns;
     // merge the row ranges generated from page index and position delete.
     RowRanges _read_ranges;
+    // ParquetColumnReader keeps a reference to _read_ranges, so readers must 
be destroyed first.
+    std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
+            _column_readers; // table_column_name
 
-    const LazyReadContext& _lazy_read_ctx;
+    LazyReadContext _lazy_read_ctx;
     int64_t _lazy_read_filtered_rows = 0;
     int64_t _predicate_filter_time = 0;
     int64_t _dict_filter_rewrite_time = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to