This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 52c75e79197 [fix](parqeut)Fixed a performance fallback caused by 
filling RL DL when reading Parquet scalar columns. (#59833)
52c75e79197 is described below

commit 52c75e7919725e567b099dffb90fcd1487ca6259
Author: daidai <[email protected]>
AuthorDate: Fri Jan 16 14:49:13 2026 +0800

    [fix](parqeut)Fixed a performance fallback caused by filling RL DL when 
reading Parquet scalar columns. (#59833)
    
    ### What problem does this PR solve?
    Related PR: #58785
    
    Problem Summary:
    The performance rollback was introduced in #58785. The reason is that,
    prior to #58785, reading ordinary columns did not require filling and
    saving the RL DL. #58785 combined the logic for reading ordinary columns
    from a struct with the logic for reading ordinary columns, and filled
    and saved the RL DL to populate the null map information of the struct.
    This PR re-separates the reading logic.
---
 .../parquet/vparquet_column_chunk_reader.cpp       | 14 ++++-----
 .../format/parquet/vparquet_column_chunk_reader.h  | 13 +++++++++
 .../exec/format/parquet/vparquet_column_reader.cpp | 33 ++++------------------
 .../exec/format/parquet/vparquet_column_reader.h   |  5 ++++
 4 files changed, 31 insertions(+), 34 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index a474b76a518..e5c81691558 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -361,7 +361,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::seek_to_nested_row(size_t
     } else {
         while (true) {
             RETURN_IF_ERROR(parse_page_header());
-            if (_page_reader->is_header_v2()) {
+            if (_page_reader->is_header_v2() || !IN_COLLECTION) {
                 if (_page_reader->start_row() <= left_row && left_row < 
_page_reader->end_row()) {
                     RETURN_IF_ERROR(load_page_data());
                     // this page contain this row.
@@ -447,11 +447,11 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_nested_rows(
     *result_rows = 0;
     rep_levels.reserve(rep_levels.size() + _remaining_rep_nums);
     while (_remaining_rep_nums) {
-        level_t rep_level = _rep_level_decoder.get_next();
+        level_t rep_level = _rep_level_get_next();
         if (rep_level == 0) {               // rep_level 0 indicates start of 
new row
             if (*result_rows == max_rows) { // this page contain max_rows, 
page no end.
                 _current_row += max_rows;
-                _rep_level_decoder.rewind_one();
+                _rep_level_rewind_one();
                 return Status::OK();
             }
             (*result_rows)++;
@@ -462,8 +462,8 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_nested_rows(
     _current_row += *result_rows;
 
     auto need_check_cross_page = [&]() -> bool {
-        return !OFFSET_INDEX && _remaining_rep_nums == 0 && 
!_page_reader->is_header_v2() &&
-               has_next_page();
+        return !OFFSET_INDEX && IN_COLLECTION && _remaining_rep_nums == 0 &&
+               !_page_reader->is_header_v2() && has_next_page();
     };
     *cross_page = need_check_cross_page();
     return Status::OK();
@@ -478,10 +478,10 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_cross_page_nested_ro
 
     *cross_page = has_next_page();
     while (_remaining_rep_nums) {
-        level_t rep_level = _rep_level_decoder.get_next();
+        level_t rep_level = _rep_level_get_next();
         if (rep_level == 0) { // rep_level 0 indicates start of new row
             *cross_page = false;
-            _rep_level_decoder.rewind_one();
+            _rep_level_rewind_one();
             break;
         }
         _remaining_rep_nums--;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 1270e5e37fc..9e77a3139f6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -206,6 +206,19 @@ private:
     void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, 
Slice& page_data);
     Status _skip_nested_rows_in_page(size_t num_rows);
 
+    level_t _rep_level_get_next() {
+        if constexpr (IN_COLLECTION) {
+            return _rep_level_decoder.get_next();
+        }
+        return 0;
+    }
+
+    void _rep_level_rewind_one() {
+        if constexpr (IN_COLLECTION) {
+            _rep_level_decoder.rewind_one();
+        }
+    }
+
     ColumnChunkReaderState _state = NOT_INIT;
     FieldSchema* _field_schema = nullptr;
     const level_t _max_rep_level;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index dac6b76aece..cc6b03a5981 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -118,8 +118,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
         RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
                                element_reader, max_buf_size, col_offsets, 
true, column_ids,
                                filter_column_ids));
-        //        element_reader->set_nested_column();
         auto array_reader = ArrayColumnReader::create_unique(row_ranges, 
total_rows, ctz, io_ctx);
+        element_reader->set_column_in_nested();
         RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
         array_reader->_filter_column_ids = filter_column_ids;
         reader.reset(array_reader.release());
@@ -133,7 +133,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
                                    key_reader, max_buf_size, col_offsets, 
true, column_ids,
                                    filter_column_ids));
-            //            key_reader->set_nested_column();
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
                                                                    io_ctx, 
&field->children[0]);
@@ -146,7 +145,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             RETURN_IF_ERROR(create(file, &field->children[1], row_group, 
row_ranges, ctz, io_ctx,
                                    value_reader, max_buf_size, col_offsets, 
true, column_ids,
                                    filter_column_ids));
-            //            value_reader->set_nested_column();
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
                                                                    io_ctx, 
&field->children[0]);
@@ -154,6 +152,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
         }
 
         auto map_reader = MapColumnReader::create_unique(row_ranges, 
total_rows, ctz, io_ctx);
+        key_reader->set_column_in_nested();
+        value_reader->set_column_in_nested();
         RETURN_IF_ERROR(map_reader->init(std::move(key_reader), 
std::move(value_reader), field));
         map_reader->_filter_column_ids = filter_column_ids;
         reader.reset(map_reader.release());
@@ -168,7 +168,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
                 RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, 
ctz, io_ctx,
                                        child_reader, max_buf_size, 
col_offsets, in_collection,
                                        column_ids, filter_column_ids));
-                //                child_reader->set_nested_column();
                 child_readers[child.name] = std::move(child_reader);
                 // Record the first non-SkippingReader
                 if (non_skip_reader_idx == -1) {
@@ -180,6 +179,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
                 skip_reader->_filter_column_ids = filter_column_ids;
                 child_readers[child.name] = std::move(skip_reader);
             }
+            child_readers[child.name]->set_column_in_nested();
         }
         // If all children are SkipReadingReader, force the first child to 
call create
         if (non_skip_reader_idx == -1) {
@@ -187,7 +187,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
                                    child_reader, max_buf_size, col_offsets, 
in_collection,
                                    column_ids, filter_column_ids));
-            //            child_reader->set_nested_column();
+            child_reader->set_column_in_nested();
             child_readers[field->children[0].name] = std::move(child_reader);
         }
         auto struct_reader = StructColumnReader::create_unique(row_ranges, 
total_rows, ctz, io_ctx);
@@ -201,8 +201,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
                                                                       : 
nullptr;
 
         const tparquet::ColumnChunk& chunk = row_group.columns[physical_index];
-
-        //                ScalarColumnReader::create_unique(row_ranges, 
total_rows, chunk, offset_index, ctz, io_ctx);
         if (in_collection) {
             if (offset_index == nullptr) {
                 auto scalar_reader = ScalarColumnReader<true, 
false>::create_unique(
@@ -354,9 +352,6 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::_read_values(size_t num_
                     return Status::InternalError("Failed to decode definition 
level.");
                 }
 
-                for (int i = 0; i < loop_read; i++) {
-                    _def_levels.emplace_back(def_level);
-                }
                 bool is_null = def_level < _field_schema->definition_level;
                 if (!(prev_is_null ^ is_null)) {
                     null_map.emplace_back(0);
@@ -371,14 +366,11 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::_read_values(size_t num_
                 prev_is_null = is_null;
                 has_read += loop_read;
             }
-        } else {
-            _def_levels.resize(_def_levels.size() + num_values, 0);
         }
     } else {
         if (_chunk_reader->max_def_level() > 0) {
             return Status::Corruption("Not nullable column has null values in 
parquet file");
         }
-        _def_levels.resize(_def_levels.size() + num_values, 0);
         data_column = doris_column->assume_mutable();
     }
     if (null_map.size() == 0) {
@@ -560,7 +552,7 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::read_column_data(
     _rep_levels.clear();
     *read_rows = 0;
 
-    if constexpr (IN_COLLECTION) {
+    if (_in_nested) {
         RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, 
filter_map, batch_size,
                                             read_rows, eof, is_dict_filter));
         return _converter->convert(resolved_column, _field_schema->data_type, 
type, doris_column,
@@ -574,7 +566,6 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::read_column_data(
     } else {
         right_row = _chunk_reader->page_end_row();
     }
-    auto before_filter_map_index = _filter_map_index;
 
     do {
         // generate the row ranges that should be read
@@ -641,18 +632,6 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::read_column_data(
         }
     }
 
-    if (filter_map.has_filter()) {
-        size_t new_rep_sz = 0;
-        for (size_t idx = before_filter_map_index; idx < _filter_map_index; 
idx++) {
-            if (filter_map.filter_map_data()[idx]) {
-                _def_levels[new_rep_sz] = _def_levels[idx - 
before_filter_map_index];
-                new_rep_sz++;
-            }
-        }
-        _def_levels.resize(new_rep_sz);
-    }
-    _rep_levels.resize(_def_levels.size(), 0);
-
     return _converter->convert(resolved_column, _field_schema->data_type, 
type, doris_column,
                                is_dict_filter);
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4a49473a69f..1359c391336 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -141,6 +141,7 @@ public:
     virtual void reset_filter_map_index() = 0;
 
     FieldSchema* get_field_schema() const { return _field_schema; }
+    void set_column_in_nested() { _in_nested = true; }
 
 protected:
     void _generate_read_ranges(RowRange page_row_range, RowRanges* 
result_ranges) const;
@@ -155,6 +156,10 @@ protected:
 
     size_t _filter_map_index = 0;
     std::set<uint64_t> _filter_column_ids;
+
+    // _in_nested: column in struct/map/array
+    // IN_COLLECTION : column in map/array
+    bool _in_nested = false;
 };
 
 template <bool IN_COLLECTION, bool OFFSET_INDEX>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to