This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9f166d16bd1 branch-3.1: [Opt](multi-catalog)Disable dict filter in
parquet/orc reader if have non-single conjuncts #44777 (#52559)
9f166d16bd1 is described below
commit 9f166d16bd1b75dc535e4ec679080a0ff008a569
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jul 1 19:15:27 2025 +0800
branch-3.1: [Opt](multi-catalog)Disable dict filter in parquet/orc reader
if have non-single conjuncts #44777 (#52559)
Cherry-picked from #44777
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 57 +++++--------
be/src/vec/exec/format/orc/vorc_reader.h | 1 +
.../exec/format/parquet/vparquet_group_reader.cpp | 98 +++++++++-------------
.../exec/format/parquet/vparquet_group_reader.h | 1 -
4 files changed, 64 insertions(+), 93 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 14784da43a6..082db47f902 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1228,18 +1228,22 @@ Status OrcReader::set_fill_columns(
}
}
- if (!_slot_id_to_filter_conjuncts) {
- return Status::OK();
- }
-
- // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single
slot conjuncts)
- // to _filter_conjuncts, others should be added from
not_single_slot_filter_conjuncts.
- for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
- auto& [value, slot_desc] = kv.second;
- auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
- if (iter != _slot_id_to_filter_conjuncts->end()) {
- for (auto& ctx : iter->second) {
- _filter_conjuncts.push_back(ctx);
+ if (!_not_single_slot_filter_conjuncts.empty()) {
+ _filter_conjuncts.insert(_filter_conjuncts.end(),
_not_single_slot_filter_conjuncts.begin(),
+ _not_single_slot_filter_conjuncts.end());
+ _disable_dict_filter = true;
+ }
+
+ if (_slot_id_to_filter_conjuncts &&
!_slot_id_to_filter_conjuncts->empty()) {
+ // Add predicate_partition_columns in
_slot_id_to_filter_conjuncts(single slot conjuncts)
+ // to _filter_conjuncts, others should be added from
not_single_slot_filter_conjuncts.
+ for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
+ auto& [value, slot_desc] = kv.second;
+ auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
+ if (iter != _slot_id_to_filter_conjuncts->end()) {
+ for (const auto& ctx : iter->second) {
+ _filter_conjuncts.push_back(ctx);
+ }
}
}
}
@@ -1996,16 +2000,8 @@ Status OrcReader::get_next_block_impl(Block* block,
size_t* read_rows, bool* eof
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter,
*_filter));
}
- if (!_not_single_slot_filter_conjuncts.empty()) {
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
- RETURN_IF_CATCH_EXCEPTION(
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts, block,
columns_to_filter,
- column_to_keep)));
- } else {
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
- }
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
} else {
uint64_t rr;
@@ -2122,17 +2118,8 @@ Status OrcReader::get_next_block_impl(Block* block,
size_t* read_rows, bool* eof
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter,
result_filter));
}
- //_not_single_slot_filter_conjuncts check : missing column1 ==
missing column2 , missing column == exists column ...
- if (!_not_single_slot_filter_conjuncts.empty()) {
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
- RETURN_IF_CATCH_EXCEPTION(
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts, block,
columns_to_filter,
- column_to_keep)));
- } else {
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
- }
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
@@ -2313,8 +2300,8 @@ Status OrcReader::fill_dict_filter_column_names(
int i = 0;
for (auto& predicate_col_name : predicate_col_names) {
int slot_id = predicate_col_slot_ids[i];
- if (_can_filter_by_dict(slot_id)) {
- _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name,
slot_id));
+ if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
+ _dict_filter_cols.emplace_back(predicate_col_name, slot_id);
column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]);
} else {
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 10d72844942..32a19a2012a 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -636,6 +636,7 @@ private:
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _non_dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
+ bool _disable_dict_filter = false;
// std::pair<col_name, slot_id>
std::vector<std::pair<std::string, int>> _dict_filter_cols;
std::shared_ptr<ObjectPool> _obj_pool;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 5c5489d3f86..00beb59e7dc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -109,11 +109,6 @@ Status RowGroupReader::init(
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_col_name_to_slot_id = colname_to_slot_id;
- if (not_single_slot_filter_conjuncts != nullptr &&
!not_single_slot_filter_conjuncts->empty()) {
-
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
-
not_single_slot_filter_conjuncts->begin(),
-
not_single_slot_filter_conjuncts->end());
- }
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
@@ -140,45 +135,52 @@ Status RowGroupReader::init(
}
_column_readers[read_col] = std::move(reader);
}
- // Check if single slot can be filtered by dict.
- if (!_slot_id_to_filter_conjuncts) {
- return Status::OK();
+
+ bool disable_dict_filter = false;
+ if (not_single_slot_filter_conjuncts != nullptr &&
!not_single_slot_filter_conjuncts->empty()) {
+ disable_dict_filter = true;
+ _filter_conjuncts.insert(_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
+ not_single_slot_filter_conjuncts->end());
}
- const std::vector<string>& predicate_col_names =
_lazy_read_ctx.predicate_columns.first;
- const std::vector<int>& predicate_col_slot_ids =
_lazy_read_ctx.predicate_columns.second;
- for (size_t i = 0; i < predicate_col_names.size(); ++i) {
- const string& predicate_col_name = predicate_col_names[i];
- int slot_id = predicate_col_slot_ids[i];
- auto field =
const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
- if (!_lazy_read_ctx.has_complex_type &&
- _can_filter_by_dict(slot_id,
-
_row_group_meta.columns[field->physical_column_index].meta_data)) {
- _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name,
slot_id));
- } else {
- if (_slot_id_to_filter_conjuncts->find(slot_id) !=
- _slot_id_to_filter_conjuncts->end()) {
- for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
- _filter_conjuncts.push_back(ctx);
+
+ // Check if single slot can be filtered by dict.
+ if (_slot_id_to_filter_conjuncts &&
!_slot_id_to_filter_conjuncts->empty()) {
+ const std::vector<string>& predicate_col_names =
_lazy_read_ctx.predicate_columns.first;
+ const std::vector<int>& predicate_col_slot_ids =
_lazy_read_ctx.predicate_columns.second;
+ for (size_t i = 0; i < predicate_col_names.size(); ++i) {
+ const string& predicate_col_name = predicate_col_names[i];
+ int slot_id = predicate_col_slot_ids[i];
+ auto field =
const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
+ if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
+ _can_filter_by_dict(
+ slot_id,
_row_group_meta.columns[field->physical_column_index].meta_data)) {
+
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
+ } else {
+ if (_slot_id_to_filter_conjuncts->find(slot_id) !=
+ _slot_id_to_filter_conjuncts->end()) {
+ for (auto& ctx :
_slot_id_to_filter_conjuncts->at(slot_id)) {
+ _filter_conjuncts.push_back(ctx);
+ }
}
}
}
- }
- // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single
slot conjuncts)
- // to _filter_conjuncts, others should be added from
not_single_slot_filter_conjuncts.
- for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
- auto& [value, slot_desc] = kv.second;
- auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
- if (iter != _slot_id_to_filter_conjuncts->end()) {
- for (auto& ctx : iter->second) {
- _filter_conjuncts.push_back(ctx);
+ // Add predicate_partition_columns in
_slot_id_to_filter_conjuncts(single slot conjuncts)
+ // to _filter_conjuncts, others should be added from
not_single_slot_filter_conjuncts.
+ for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
+ auto& [value, slot_desc] = kv.second;
+ auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
+ if (iter != _slot_id_to_filter_conjuncts->end()) {
+ for (auto& ctx : iter->second) {
+ _filter_conjuncts.push_back(ctx);
+ }
}
}
+ //For check missing column : missing column == xx, missing column is
null,missing column is not null.
+ _filter_conjuncts.insert(_filter_conjuncts.end(),
+
_lazy_read_ctx.missing_columns_conjuncts.begin(),
+
_lazy_read_ctx.missing_columns_conjuncts.end());
+ RETURN_IF_ERROR(_rewrite_dict_predicates());
}
- //For check missing column : missing column == xx, missing column is
null,missing column is not null.
- _filter_conjuncts.insert(_filter_conjuncts.end(),
- _lazy_read_ctx.missing_columns_conjuncts.begin(),
- _lazy_read_ctx.missing_columns_conjuncts.end());
- RETURN_IF_ERROR(_rewrite_dict_predicates());
return Status::OK();
}
@@ -351,17 +353,8 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter,
result_filter));
- if (!_not_single_slot_filter_conjuncts.empty()) {
- _convert_dict_cols_to_string_cols(block);
- SCOPED_RAW_TIMER(&_predicate_filter_time);
- RETURN_IF_CATCH_EXCEPTION(
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts, block,
columns_to_filter,
- column_to_keep)));
- } else {
- Block::erase_useless_column(block, column_to_keep);
- _convert_dict_cols_to_string_cols(block);
- }
+ Block::erase_useless_column(block, column_to_keep);
+ _convert_dict_cols_to_string_cols(block);
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter)));
@@ -594,15 +587,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
*batch_eof = pre_eof;
RETURN_IF_ERROR(_fill_partition_columns(block, column_size,
_lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, column_size,
_lazy_read_ctx.missing_columns));
- if (!_not_single_slot_filter_conjuncts.empty()) {
- {
- SCOPED_RAW_TIMER(&_predicate_filter_time);
- RETURN_IF_CATCH_EXCEPTION(
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts, block,
columns_to_filter,
- origin_column_num)));
- }
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index f73e9ebe09e..8106241014b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -220,7 +220,6 @@ private:
const TupleDescriptor* _tuple_descriptor = nullptr;
const RowDescriptor* _row_descriptor = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
- VExprContextSPtrs _not_single_slot_filter_conjuncts;
const std::unordered_map<int, VExprContextSPtrs>*
_slot_id_to_filter_conjuncts = nullptr;
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]