This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fc7953bd0d4 [Opt](orc-reader) Optimize orc string dict filter in
not_single_conjunct case. (#26386) (#26696)
fc7953bd0d4 is described below
commit fc7953bd0d4cc476976d550417dcde311417b3d4
Author: Qi Chen <[email protected]>
AuthorDate: Fri Nov 10 12:08:37 2023 +0800
[Opt](orc-reader) Optimize orc string dict filter in not_single_conjunct
case. (#26386) (#26696)
Optimize orc/parquet string dict filter in not_single_conjunct case. We can
optimize this processing to filter block firstly by dict code, then filter by
not_single_conjunct. Because dict code is int, it will filter faster than
string.
For example:
```
select count(l_receiptdate) from lineitem_date_as_string where l_shipmode
in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate and l_receiptdate >=
'1994-01-01' and l_receiptdate < '1995-01-01';
```
`l_receiptdate` and `l_shipmode` will using string dict filtering, and
`l_commitdate < l_receiptdate` is the an not_single_conjunct which contains
dict filter field. We can optimize this processing to filter block firstly by
dict code, then filter by not_single_conjunct. Because dict code is int, it
will filter faster than string.
Before:
mysql> select count(l_receiptdate) from lineitem_date_as_string where
l_shipmode in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate and
l_receiptdate >= '1994-01-01' and l_receiptdate < '1995-01-01';
+----------------------+
| count(l_receiptdate) |
+----------------------+
| 49314694 |
+----------------------+
1 row in set (6.87 sec)
After:
mysql> select count(l_receiptdate) from lineitem_date_as_string where
l_shipmode in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate and
l_receiptdate >= '1994-01-01' and l_receiptdate < '1995-01-01';
+----------------------+
| count(l_receiptdate) |
+----------------------+
| 49314694 |
+----------------------+
1 row in set (4.85 sec)
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 46 +++++++++++-----------
be/src/vec/exec/format/orc/vorc_reader.h | 1 +
.../exec/format/parquet/vparquet_group_reader.cpp | 11 ++----
3 files changed, 29 insertions(+), 29 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index b7399300b63..08baaf242eb 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -146,7 +146,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState*
state,
_ctz(ctz),
_is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
- _enable_lazy_mat(enable_lazy_mat) {
+ _enable_lazy_mat(enable_lazy_mat),
+ _is_dict_cols_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
VecDateTimeValue t;
t.from_unixtime(0, ctz);
@@ -165,7 +166,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx),
- _enable_lazy_mat(enable_lazy_mat) {
+ _enable_lazy_mat(enable_lazy_mat),
+ _is_dict_cols_converted(false) {
_init_system_properties();
_init_file_description();
}
@@ -1426,7 +1428,6 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
col_name, column_ptr, column_type,
_col_orc_type[orc_col_idx->second],
batch_vec[orc_col_idx->second], _batch->numElements));
}
- *read_rows = rr;
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
_lazy_read_ctx.partition_columns));
@@ -1434,22 +1435,24 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
_fill_missing_columns(block, _batch->numElements,
_lazy_read_ctx.missing_columns));
if (block->rows() == 0) {
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
nullptr));
*eof = true;
+ *read_rows = 0;
return Status::OK();
}
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block,
columns_to_filter, *_filter));
if (!_not_single_slot_filter_conjuncts.empty()) {
- std::vector<IColumn::Filter*> filters;
- filters.push_back(_filter.get());
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
&batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts, &filters,
block, columns_to_filter,
+ _not_single_slot_filter_conjuncts, nullptr, block,
columns_to_filter,
column_to_keep)));
} else {
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
*_filter));
Block::erase_useless_column(block, column_to_keep);
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
&batch_vec));
}
+ *read_rows = block->rows();
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1486,6 +1489,7 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
+ _is_dict_cols_converted = true;
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
@@ -1502,7 +1506,6 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
col_name, column_ptr, column_type,
_col_orc_type[orc_col_idx->second],
batch_vec[orc_col_idx->second], _batch->numElements));
}
- *read_rows = rr;
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
_lazy_read_ctx.partition_columns));
@@ -1512,6 +1515,7 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
if (block->rows() == 0) {
_convert_dict_cols_to_string_cols(block, nullptr);
*eof = true;
+ *read_rows = 0;
return Status::OK();
}
@@ -1549,19 +1553,17 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
_convert_dict_cols_to_string_cols(block, &batch_vec);
return Status::OK();
}
+ RETURN_IF_CATCH_EXCEPTION(
+ Block::filter_block_internal(block, columns_to_filter,
result_filter));
if (!_not_single_slot_filter_conjuncts.empty()) {
- _convert_dict_cols_to_string_cols(block, &batch_vec);
- std::vector<IColumn::Filter*> merged_filters;
- merged_filters.push_back(&result_filter);
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
&batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts,
&merged_filters, block,
+ _not_single_slot_filter_conjuncts, nullptr,
block,
columns_to_filter, column_to_keep)));
} else {
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
result_filter));
Block::erase_useless_column(block, column_to_keep);
- _convert_dict_cols_to_string_cols(block, &batch_vec);
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
&batch_vec));
}
} else {
if (_delete_rows_filter_ptr) {
@@ -1569,8 +1571,9 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
(*_delete_rows_filter_ptr)));
}
Block::erase_useless_column(block, column_to_keep);
- _convert_dict_cols_to_string_cols(block, &batch_vec);
+ static_cast<void>(_convert_dict_cols_to_string_cols(block,
&batch_vec));
}
+ *read_rows = block->rows();
}
return Status::OK();
}
@@ -1632,6 +1635,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
+ _is_dict_cols_converted = true;
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, &data, 0);
std::vector<string> col_names;
@@ -1710,11 +1714,6 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
new_size += result_filter_data[i] ? 1 : 0;
}
data.numElements = new_size;
- if (data.numElements > 0) {
- _convert_dict_cols_to_string_cols(block, &batch_vec);
- } else {
- _convert_dict_cols_to_string_cols(block, nullptr);
- }
return Status::OK();
}
@@ -2035,6 +2034,9 @@ Status
OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int
Status OrcReader::_convert_dict_cols_to_string_cols(
Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
+ if (!_is_dict_cols_converted) {
+ return Status::OK();
+ }
for (auto& dict_filter_cols : _dict_filter_cols) {
size_t pos = block->get_position_by_name(dict_filter_cols.first);
ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(pos);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0d9fb3df29d..4d1e6d74bb8 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -557,6 +557,7 @@ private:
std::vector<std::pair<std::string, int>> _dict_filter_cols;
std::shared_ptr<ObjectPool> _obj_pool;
std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
+ bool _is_dict_cols_converted;
};
class ORCFileInputStream : public orc::InputStream {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 661fcd24f64..a4f77e2c056 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -345,17 +345,16 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
_convert_dict_cols_to_string_cols(block);
return Status::OK();
}
+
+ RETURN_IF_CATCH_EXCEPTION(
+ Block::filter_block_internal(block, columns_to_filter,
result_filter));
if (!_not_single_slot_filter_conjuncts.empty()) {
_convert_dict_cols_to_string_cols(block);
- std::vector<IColumn::Filter*> merged_filters;
- merged_filters.push_back(&result_filter);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _not_single_slot_filter_conjuncts,
&merged_filters, block,
+ _not_single_slot_filter_conjuncts, nullptr,
block,
columns_to_filter, column_to_keep)));
} else {
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
result_filter));
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
}
@@ -573,8 +572,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
RETURN_IF_ERROR(_fill_partition_columns(block, column_size,
_lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, column_size,
_lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
- std::vector<IColumn::Filter*> filters;
- filters.push_back(&result_filter);
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, nullptr, block,
columns_to_filter,
origin_column_num)));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]