This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 028a1f79eb9 Backport 63102 branch 4.1 (#63740)
028a1f79eb9 is described below
commit 028a1f79eb985bd1f29617a0683dfc9da49bb004
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 14:42:09 2026 +0800
Backport 63102 branch 4.1 (#63740)
### What problem does this PR solve?
Issue Number: close #xxx
#63102
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/format/parquet/vparquet_group_reader.cpp | 124 ++++++++++++++----------
be/src/format/parquet/vparquet_group_reader.h | 11 ++-
2 files changed, 78 insertions(+), 57 deletions(-)
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp
b/be/src/format/parquet/vparquet_group_reader.cpp
index 48375a764f4..d7f822f77d0 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -162,8 +162,9 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr
file_reader,
_filter_column_ids(filter_column_ids) {}
RowGroupReader::~RowGroupReader() {
- _column_readers.clear();
- _obj_pool->clear();
+ if (_obj_pool != nullptr) {
+ _obj_pool->clear();
+ }
}
Status RowGroupReader::init(
@@ -493,31 +494,31 @@ Status RowGroupReader::_read_column_data(Block* block,
size_t batch_read_rows = 0;
bool has_eof = false;
for (auto& read_col_name : table_columns) {
- auto& column_with_type_and_name =
-
block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name,
&block_pos));
+ auto reader_iter = _column_readers.find(read_col_name);
+ if (reader_iter == _column_readers.end() || reader_iter->second ==
nullptr) {
+ return Status::InternalError("Column reader for '{}' not found in
parquet row group",
+ read_col_name);
+ }
+
+ auto& column_with_type_and_name =
block->safe_get_by_position(block_pos);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
bool is_dict_filter = false;
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnInt32::create();
- if (!_col_name_to_block_idx->contains(read_col_name)) {
- return Status::InternalError(
- "Wrong read column '{}' in parquet file, block:
{}", read_col_name,
- block->dump_structure());
- }
if (column_type->is_nullable()) {
-
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
+ block->get_by_position(block_pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
- (*_col_name_to_block_idx)[read_col_name],
+ block_pos,
ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
-
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
- std::make_shared<DataTypeInt32>();
-
block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
- std::move(dict_column));
+ block->get_by_position(block_pos).type =
std::make_shared<DataTypeInt32>();
+ block->replace_by_position(block_pos,
std::move(dict_column));
}
is_dict_filter = true;
break;
@@ -528,10 +529,10 @@ Status RowGroupReader::_read_column_data(Block* block,
bool col_eof = false;
// Should reset _filter_map_index to 0 when reading next column.
// select_vector.reset();
- _column_readers[read_col_name]->reset_filter_map_index();
+ reader_iter->second->reset_filter_map_index();
while (!col_eof && col_read_rows < batch_size) {
size_t loop_rows = 0;
- RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
+ RETURN_IF_ERROR(reader_iter->second->read_column_data(
column_ptr, column_type,
_table_info_node_ptr->get_children_node(read_col_name),
filter_map, batch_size - col_read_rows, &loop_rows,
&col_eof, is_dict_filter));
VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
@@ -657,19 +658,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
SCOPED_RAW_TIMER(&_predicate_filter_time);
for (const auto& col : _lazy_read_ctx.predicate_columns.first)
{
// clean block to read predicate columns
- block->get_by_position((*_col_name_to_block_idx)[col])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col :
_lazy_read_ctx.predicate_partition_columns) {
-
block->get_by_position((*_col_name_to_block_idx)[col.first])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col.first,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col :
_lazy_read_ctx.predicate_missing_columns) {
-
block->get_by_position((*_col_name_to_block_idx)[col.first])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col.first,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
if (_row_id_column_iterator_pair.first != nullptr) {
block->get_by_position(_row_id_column_iterator_pair.second)
@@ -828,7 +829,9 @@ Status RowGroupReader::_fill_partition_columns(
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (const auto& kv : partition_columns) {
- auto doris_column =
block->get_by_position((*_col_name_to_block_idx)[kv.first]).column;
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
+ auto doris_column = block->get_by_position(block_pos).column;
// obtained from block*, it is a mutable object.
auto* col_ptr = const_cast<IColumn*>(doris_column.get());
const auto& [value, slot_desc] = kv.second;
@@ -857,14 +860,11 @@ Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
for (const auto& kv : missing_columns) {
- if (!_col_name_to_block_idx->contains(kv.first)) {
- return Status::InternalError("Missing column: {} not found in
block {}", kv.first,
- block->dump_structure());
- }
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
if (kv.second == nullptr) {
// no default column, fill with null
- auto mutable_column =
block->get_by_position((*_col_name_to_block_idx)[kv.first])
- .column->assume_mutable();
+ auto mutable_column =
block->get_by_position(block_pos).column->assume_mutable();
auto* nullable_column =
assert_cast<ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
@@ -881,18 +881,39 @@ Status RowGroupReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type =
-
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
+ auto origin_column_type =
block->get_by_position(block_pos).type;
bool is_nullable = origin_column_type->is_nullable();
- block->replace_by_position(
- (*_col_name_to_block_idx)[kv.first],
- is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
+ block->replace_by_position(block_pos, is_nullable ?
make_nullable(result_column_ptr)
+ :
result_column_ptr);
}
}
}
return Status::OK();
}
+Status RowGroupReader::_get_block_column_pos(const Block& block, const
std::string& column_name,
+ uint32_t* position) const {
+ if (_col_name_to_block_idx == nullptr) {
+ return Status::InternalError(
+ "Column name to block index map is not set when reading
parquet column '{}', "
+ "block: "
+ "{}",
+ column_name, block.dump_structure());
+ }
+ auto iter = _col_name_to_block_idx->find(column_name);
+ if (iter == _col_name_to_block_idx->end()) {
+ return Status::InternalError("Column '{}' not found in block index
map, block: {}",
+ column_name, block.dump_structure());
+ }
+ if (iter->second >= block.columns()) {
+ return Status::InternalError(
+ "Column '{}' maps to invalid block position {}, block columns:
{}, block: {}",
+ column_name, iter->second, block.columns(),
block.dump_structure());
+ }
+ *position = iter->second;
+ return Status::OK();
+}
+
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows,
bool* batch_eof,
bool* modify_row_ids) {
*modify_row_ids = false;
@@ -1336,13 +1357,14 @@ Status
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_cols : _dict_filter_cols) {
- if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "Wrong read column '{}' in parquet file, block:
{}",
- dict_filter_cols.first, block->dump_structure());
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first,
&block_pos));
+ auto reader_iter = _column_readers.find(dict_filter_cols.first);
+ if (reader_iter == _column_readers.end() || reader_iter->second ==
nullptr) {
+ return Status::InternalError("Column reader for '{}' not found in
parquet row group",
+ dict_filter_cols.first);
}
- ColumnWithTypeAndName& column_with_type_and_name =
-
block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
+ ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(block_pos);
const ColumnPtr& column = column_with_type_and_name.column;
if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
@@ -1350,24 +1372,20 @@ Status
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
DCHECK(dict_column);
auto string_column = DORIS_TRY(
-
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column));
+
reader_iter->second->convert_dict_column_to_string_column(dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
- (*_col_name_to_block_idx)[dict_filter_cols.first],
- ColumnNullable::create(std::move(string_column),
-
nullable_column->get_null_map_column_ptr()));
+ block_pos, ColumnNullable::create(std::move(string_column),
+
nullable_column->get_null_map_column_ptr()));
} else {
const auto* dict_column = assert_cast<const
ColumnInt32*>(column.get());
auto string_column = DORIS_TRY(
-
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column));
+
reader_iter->second->convert_dict_column_to_string_column(dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeString>();
-
block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
- std::move(string_column));
+ block->replace_by_position(block_pos, std::move(string_column));
}
}
return Status::OK();
diff --git a/be/src/format/parquet/vparquet_group_reader.h
b/be/src/format/parquet/vparquet_group_reader.h
index 657876b36e6..208d3995b90 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -233,6 +233,8 @@ private:
Status _fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns);
+ Status _get_block_column_pos(const Block& block, const std::string&
column_name,
+ uint32_t* position) const;
Status _build_pos_delete_filter(size_t read_rows);
Status _filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>& columns_to_filter);
@@ -250,9 +252,7 @@ private:
Status _append_iceberg_rowid_column(Block* block, size_t read_rows, bool
is_current_row_ids);
io::FileReaderSPtr _file_reader;
- std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
- _column_readers; // table_column_name
- const std::vector<std::string>& _read_table_columns;
+ std::vector<std::string> _read_table_columns;
const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
@@ -263,8 +263,11 @@ private:
std::shared_ptr<RowLineageColumns> _row_lineage_columns;
// merge the row ranges generated from page index and position delete.
RowRanges _read_ranges;
+ // ParquetColumnReader keeps a reference to _read_ranges, so readers must
be destroyed first.
+ std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
+ _column_readers; // table_column_name
- const LazyReadContext& _lazy_read_ctx;
+ LazyReadContext _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
int64_t _dict_filter_rewrite_time = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]