This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 52c75e79197 [fix](parqeut)Fixed a performance fallback caused by
filling RL DL when reading Parquet scalar columns. (#59833)
52c75e79197 is described below
commit 52c75e7919725e567b099dffb90fcd1487ca6259
Author: daidai <[email protected]>
AuthorDate: Fri Jan 16 14:49:13 2026 +0800
[fix](parqeut)Fixed a performance fallback caused by filling RL DL when
reading Parquet scalar columns. (#59833)
### What problem does this PR solve?
Related PR: #58785
Problem Summary:
The performance rollback was introduced in #58785. The reason is that,
prior to #58785, reading ordinary columns did not require filling and
saving the RL DL. #58785 combined the logic for reading ordinary columns
from a struct with the logic for reading ordinary columns, and filled
and saved the RL DL to populate the null map information of the struct.
This PR re-separates the reading logic.
---
.../parquet/vparquet_column_chunk_reader.cpp | 14 ++++-----
.../format/parquet/vparquet_column_chunk_reader.h | 13 +++++++++
.../exec/format/parquet/vparquet_column_reader.cpp | 33 ++++------------------
.../exec/format/parquet/vparquet_column_reader.h | 5 ++++
4 files changed, 31 insertions(+), 34 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index a474b76a518..e5c81691558 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -361,7 +361,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::seek_to_nested_row(size_t
} else {
while (true) {
RETURN_IF_ERROR(parse_page_header());
- if (_page_reader->is_header_v2()) {
+ if (_page_reader->is_header_v2() || !IN_COLLECTION) {
if (_page_reader->start_row() <= left_row && left_row <
_page_reader->end_row()) {
RETURN_IF_ERROR(load_page_data());
// this page contain this row.
@@ -447,11 +447,11 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_nested_rows(
*result_rows = 0;
rep_levels.reserve(rep_levels.size() + _remaining_rep_nums);
while (_remaining_rep_nums) {
- level_t rep_level = _rep_level_decoder.get_next();
+ level_t rep_level = _rep_level_get_next();
if (rep_level == 0) { // rep_level 0 indicates start of
new row
if (*result_rows == max_rows) { // this page contain max_rows,
page no end.
_current_row += max_rows;
- _rep_level_decoder.rewind_one();
+ _rep_level_rewind_one();
return Status::OK();
}
(*result_rows)++;
@@ -462,8 +462,8 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_nested_rows(
_current_row += *result_rows;
auto need_check_cross_page = [&]() -> bool {
- return !OFFSET_INDEX && _remaining_rep_nums == 0 &&
!_page_reader->is_header_v2() &&
- has_next_page();
+ return !OFFSET_INDEX && IN_COLLECTION && _remaining_rep_nums == 0 &&
+ !_page_reader->is_header_v2() && has_next_page();
};
*cross_page = need_check_cross_page();
return Status::OK();
@@ -478,10 +478,10 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_cross_page_nested_ro
*cross_page = has_next_page();
while (_remaining_rep_nums) {
- level_t rep_level = _rep_level_decoder.get_next();
+ level_t rep_level = _rep_level_get_next();
if (rep_level == 0) { // rep_level 0 indicates start of new row
*cross_page = false;
- _rep_level_decoder.rewind_one();
+ _rep_level_rewind_one();
break;
}
_remaining_rep_nums--;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 1270e5e37fc..9e77a3139f6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -206,6 +206,19 @@ private:
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2,
Slice& page_data);
Status _skip_nested_rows_in_page(size_t num_rows);
+ level_t _rep_level_get_next() {
+ if constexpr (IN_COLLECTION) {
+ return _rep_level_decoder.get_next();
+ }
+ return 0;
+ }
+
+ void _rep_level_rewind_one() {
+ if constexpr (IN_COLLECTION) {
+ _rep_level_decoder.rewind_one();
+ }
+ }
+
ColumnChunkReaderState _state = NOT_INIT;
FieldSchema* _field_schema = nullptr;
const level_t _max_rep_level;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index dac6b76aece..cc6b03a5981 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -118,8 +118,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
element_reader, max_buf_size, col_offsets,
true, column_ids,
filter_column_ids));
- // element_reader->set_nested_column();
auto array_reader = ArrayColumnReader::create_unique(row_ranges,
total_rows, ctz, io_ctx);
+ element_reader->set_column_in_nested();
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
array_reader->_filter_column_ids = filter_column_ids;
reader.reset(array_reader.release());
@@ -133,7 +133,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
key_reader, max_buf_size, col_offsets,
true, column_ids,
filter_column_ids));
- // key_reader->set_nested_column();
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges,
total_rows, ctz,
io_ctx,
&field->children[0]);
@@ -146,7 +145,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[1], row_group,
row_ranges, ctz, io_ctx,
value_reader, max_buf_size, col_offsets,
true, column_ids,
filter_column_ids));
- // value_reader->set_nested_column();
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges,
total_rows, ctz,
io_ctx,
&field->children[0]);
@@ -154,6 +152,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
}
auto map_reader = MapColumnReader::create_unique(row_ranges,
total_rows, ctz, io_ctx);
+ key_reader->set_column_in_nested();
+ value_reader->set_column_in_nested();
RETURN_IF_ERROR(map_reader->init(std::move(key_reader),
std::move(value_reader), field));
map_reader->_filter_column_ids = filter_column_ids;
reader.reset(map_reader.release());
@@ -168,7 +168,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
RETURN_IF_ERROR(create(file, &child, row_group, row_ranges,
ctz, io_ctx,
child_reader, max_buf_size,
col_offsets, in_collection,
column_ids, filter_column_ids));
- // child_reader->set_nested_column();
child_readers[child.name] = std::move(child_reader);
// Record the first non-SkippingReader
if (non_skip_reader_idx == -1) {
@@ -180,6 +179,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
skip_reader->_filter_column_ids = filter_column_ids;
child_readers[child.name] = std::move(skip_reader);
}
+ child_readers[child.name]->set_column_in_nested();
}
// If all children are SkipReadingReader, force the first child to
call create
if (non_skip_reader_idx == -1) {
@@ -187,7 +187,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
child_reader, max_buf_size, col_offsets,
in_collection,
column_ids, filter_column_ids));
- // child_reader->set_nested_column();
+ child_reader->set_column_in_nested();
child_readers[field->children[0].name] = std::move(child_reader);
}
auto struct_reader = StructColumnReader::create_unique(row_ranges,
total_rows, ctz, io_ctx);
@@ -201,8 +201,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
:
nullptr;
const tparquet::ColumnChunk& chunk = row_group.columns[physical_index];
-
- // ScalarColumnReader::create_unique(row_ranges,
total_rows, chunk, offset_index, ctz, io_ctx);
if (in_collection) {
if (offset_index == nullptr) {
auto scalar_reader = ScalarColumnReader<true,
false>::create_unique(
@@ -354,9 +352,6 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::_read_values(size_t num_
return Status::InternalError("Failed to decode definition
level.");
}
- for (int i = 0; i < loop_read; i++) {
- _def_levels.emplace_back(def_level);
- }
bool is_null = def_level < _field_schema->definition_level;
if (!(prev_is_null ^ is_null)) {
null_map.emplace_back(0);
@@ -371,14 +366,11 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::_read_values(size_t num_
prev_is_null = is_null;
has_read += loop_read;
}
- } else {
- _def_levels.resize(_def_levels.size() + num_values, 0);
}
} else {
if (_chunk_reader->max_def_level() > 0) {
return Status::Corruption("Not nullable column has null values in
parquet file");
}
- _def_levels.resize(_def_levels.size() + num_values, 0);
data_column = doris_column->assume_mutable();
}
if (null_map.size() == 0) {
@@ -560,7 +552,7 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::read_column_data(
_rep_levels.clear();
*read_rows = 0;
- if constexpr (IN_COLLECTION) {
+ if (_in_nested) {
RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type,
filter_map, batch_size,
read_rows, eof, is_dict_filter));
return _converter->convert(resolved_column, _field_schema->data_type,
type, doris_column,
@@ -574,7 +566,6 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::read_column_data(
} else {
right_row = _chunk_reader->page_end_row();
}
- auto before_filter_map_index = _filter_map_index;
do {
// generate the row ranges that should be read
@@ -641,18 +632,6 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::read_column_data(
}
}
- if (filter_map.has_filter()) {
- size_t new_rep_sz = 0;
- for (size_t idx = before_filter_map_index; idx < _filter_map_index;
idx++) {
- if (filter_map.filter_map_data()[idx]) {
- _def_levels[new_rep_sz] = _def_levels[idx -
before_filter_map_index];
- new_rep_sz++;
- }
- }
- _def_levels.resize(new_rep_sz);
- }
- _rep_levels.resize(_def_levels.size(), 0);
-
return _converter->convert(resolved_column, _field_schema->data_type,
type, doris_column,
is_dict_filter);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4a49473a69f..1359c391336 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -141,6 +141,7 @@ public:
virtual void reset_filter_map_index() = 0;
FieldSchema* get_field_schema() const { return _field_schema; }
+ void set_column_in_nested() { _in_nested = true; }
protected:
void _generate_read_ranges(RowRange page_row_range, RowRanges*
result_ranges) const;
@@ -155,6 +156,10 @@ protected:
size_t _filter_map_index = 0;
std::set<uint64_t> _filter_column_ids;
+
+ // _in_nested: column in struct/map/array
+ // IN_COLLECTION : column in map/array
+ bool _in_nested = false;
};
template <bool IN_COLLECTION, bool OFFSET_INDEX>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]