This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4b261d43b6b [Enhancement](multi-catalog) Add PredicateFilterTime,
DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet orc
profiles. (#52815)
4b261d43b6b is described below
commit 4b261d43b6b9f2154d3ea3b605b3e8e44c545ea2
Author: Qi Chen <[email protected]>
AuthorDate: Wed Jul 16 09:07:38 2025 +0800
[Enhancement](multi-catalog) Add PredicateFilterTime,
DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet orc
profiles. (#52815)
### What problem does this PR solve?
Problem Summary:
### Release note
Cherry-pick #51248
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 131 +++++++++--------
be/src/vec/exec/format/orc/vorc_reader.h | 7 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 159 +++++++++++----------
.../exec/format/parquet/vparquet_group_reader.h | 2 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 4 +
be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +
6 files changed, 169 insertions(+), 136 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index fefe3aef502..b11c94a3de2 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -199,7 +199,9 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time,
_statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time,
_statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time,
_statistics.decode_null_map_time);
- COUNTER_UPDATE(_orc_profile.filter_block_time,
_statistics.filter_block_time);
+ COUNTER_UPDATE(_orc_profile.predicate_filter_time,
_statistics.predicate_filter_time);
+ COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time,
_statistics.dict_filter_rewrite_time);
+ COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows,
_statistics.lazy_read_filtered_rows);
if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
@@ -233,8 +235,12 @@ void OrcReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime",
orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime",
orc_profile, 1);
- _orc_profile.filter_block_time =
- ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime",
orc_profile, 1);
+ _orc_profile.predicate_filter_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime",
orc_profile, 1);
+ _orc_profile.dict_filter_rewrite_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime",
orc_profile, 1);
+ _orc_profile.lazy_read_filtered_rows =
+ ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead",
TUnit::UNIT, 1);
}
}
@@ -1713,15 +1719,18 @@ Status OrcReader::get_next_block_impl(Block* block,
size_t* read_rows, bool* eof
*read_rows = 0;
return Status::OK();
}
- _execute_filter_position_delete_rowids(*_filter);
{
- SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
*_filter));
+ SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
+ _execute_filter_position_delete_rowids(*_filter);
+ {
+ SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
+ RETURN_IF_CATCH_EXCEPTION(
+ Block::filter_block_internal(block, columns_to_filter,
*_filter));
+ }
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
+ *read_rows = block->rows();
}
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
- *read_rows = block->rows();
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1798,63 +1807,60 @@ Status OrcReader::get_next_block_impl(Block* block,
size_t* read_rows, bool* eof
return Status::OK();
}
- _build_delete_row_filter(block, _batch->numElements);
-
- std::vector<uint32_t> columns_to_filter;
- int column_to_keep = block->columns();
- columns_to_filter.resize(column_to_keep);
- for (uint32_t i = 0; i < column_to_keep; ++i) {
- columns_to_filter[i] = i;
- }
- if (!_lazy_read_ctx.conjuncts.empty()) {
- VExprContextSPtrs filter_conjuncts;
- filter_conjuncts.insert(filter_conjuncts.end(),
_filter_conjuncts.begin(),
- _filter_conjuncts.end());
- for (auto& conjunct : _dict_filter_conjuncts) {
- filter_conjuncts.emplace_back(conjunct);
- }
- for (auto& conjunct : _non_dict_filter_conjuncts) {
- filter_conjuncts.emplace_back(conjunct);
- }
- std::vector<IColumn::Filter*> filters;
- if (_delete_rows_filter_ptr) {
- filters.push_back(_delete_rows_filter_ptr.get());
- }
- IColumn::Filter result_filter(block->rows(), 1);
- bool can_filter_all = false;
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
- filter_conjuncts, &filters, block, &result_filter,
&can_filter_all));
- if (can_filter_all) {
- for (auto& col : columns_to_filter) {
-
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ {
+ SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
+ _build_delete_row_filter(block, _batch->numElements);
+
+ std::vector<uint32_t> columns_to_filter;
+ int column_to_keep = block->columns();
+ columns_to_filter.resize(column_to_keep);
+ for (uint32_t i = 0; i < column_to_keep; ++i) {
+ columns_to_filter[i] = i;
+ }
+ if (!_lazy_read_ctx.conjuncts.empty()) {
+ VExprContextSPtrs filter_conjuncts;
+ filter_conjuncts.insert(filter_conjuncts.end(),
_filter_conjuncts.begin(),
+ _filter_conjuncts.end());
+ for (auto& conjunct : _dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
}
- Block::erase_useless_column(block, column_to_keep);
- return _convert_dict_cols_to_string_cols(block, &batch_vec);
- }
- _execute_filter_position_delete_rowids(result_filter);
- {
- SCOPED_RAW_TIMER(&_statistics.filter_block_time);
+ for (auto& conjunct : _non_dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
+ }
+ std::vector<IColumn::Filter*> filters;
+ if (_delete_rows_filter_ptr) {
+ filters.push_back(_delete_rows_filter_ptr.get());
+ }
+ IColumn::Filter result_filter(block->rows(), 1);
+ bool can_filter_all = false;
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
+ filter_conjuncts, &filters, block, &result_filter,
&can_filter_all));
+ if (can_filter_all) {
+ for (auto& col : columns_to_filter) {
+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ Block::erase_useless_column(block, column_to_keep);
+ return _convert_dict_cols_to_string_cols(block,
&batch_vec);
+ }
+ _execute_filter_position_delete_rowids(result_filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter,
result_filter));
- }
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
- } else {
- if (_delete_rows_filter_ptr) {
-
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
- SCOPED_RAW_TIMER(&_statistics.filter_block_time);
- RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block,
columns_to_filter,
-
(*_delete_rows_filter_ptr)));
+ Block::erase_useless_column(block, column_to_keep);
} else {
- std::unique_ptr<IColumn::Filter> filter(new
IColumn::Filter(block->rows(), 1));
- _execute_filter_position_delete_rowids(*filter);
- SCOPED_RAW_TIMER(&_statistics.filter_block_time);
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
(*filter)));
+ if (_delete_rows_filter_ptr) {
+
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
+ block, columns_to_filter,
(*_delete_rows_filter_ptr)));
+ } else {
+ std::unique_ptr<IColumn::Filter> filter(new
IColumn::Filter(block->rows(), 1));
+ _execute_filter_position_delete_rowids(*filter);
+ RETURN_IF_CATCH_EXCEPTION(
+ Block::filter_block_internal(block,
columns_to_filter, (*filter)));
+ }
+ Block::erase_useless_column(block, column_to_keep);
}
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block,
&batch_vec));
}
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
}
return Status::OK();
@@ -1898,6 +1904,7 @@ void OrcReader::_build_delete_row_filter(const Block*
block, size_t rows) {
}
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t
size, void* arg) {
+ SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
Block* block = (Block*)arg;
size_t origin_column_num = block->columns();
@@ -1998,6 +2005,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
sel[new_size] = i;
new_size += result_filter_data[i] ? 1 : 0;
}
+ _statistics.lazy_read_filtered_rows += static_cast<int64_t>(size -
new_size);
data.numElements = new_size;
return Status::OK();
}
@@ -2071,6 +2079,7 @@ bool OrcReader::_can_filter_by_dict(int slot_id) {
Status OrcReader::on_string_dicts_loaded(
std::unordered_map<std::string, orc::StringDictionary*>&
file_column_name_to_dict_map,
bool* is_stripe_filtered) {
+ SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
*is_stripe_filtered = false;
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 6fb4c886eec..15416f92601 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -128,7 +128,9 @@ public:
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
- int64_t filter_block_time = 0;
+ int64_t predicate_filter_time = 0;
+ int64_t dict_filter_rewrite_time = 0;
+ int64_t lazy_read_filtered_rows = 0;
};
OrcReader(RuntimeProfile* profile, RuntimeState* state, const
TFileScanRangeParams& params,
@@ -227,6 +229,9 @@ private:
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* filter_block_time = nullptr;
+ RuntimeProfile::Counter* predicate_filter_time = nullptr;
+ RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
+ RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
};
class ORCFilterImpl : public orc::ORCFilter {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b3eae111090..fb99d064f7b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -319,45 +319,46 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
*read_rows = block->rows();
return Status::OK();
}
+ {
+ SCOPED_RAW_TIMER(&_predicate_filter_time);
+ RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
- RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
-
- std::vector<uint32_t> columns_to_filter;
- int column_to_keep = block->columns();
- columns_to_filter.resize(column_to_keep);
- for (uint32_t i = 0; i < column_to_keep; ++i) {
- columns_to_filter[i] = i;
- }
- if (!_lazy_read_ctx.conjuncts.empty()) {
- std::vector<IColumn::Filter*> filters;
- if (_position_delete_ctx.has_filter) {
- filters.push_back(_pos_delete_filter_ptr.get());
+ std::vector<uint32_t> columns_to_filter;
+ int column_to_keep = block->columns();
+ columns_to_filter.resize(column_to_keep);
+ for (uint32_t i = 0; i < column_to_keep; ++i) {
+ columns_to_filter[i] = i;
}
- IColumn::Filter result_filter(block->rows(), 1);
- bool can_filter_all = false;
+ if (!_lazy_read_ctx.conjuncts.empty()) {
+ std::vector<IColumn::Filter*> filters;
+ if (_position_delete_ctx.has_filter) {
+ filters.push_back(_pos_delete_filter_ptr.get());
+ }
+ IColumn::Filter result_filter(block->rows(), 1);
+ bool can_filter_all = false;
- {
- SCOPED_RAW_TIMER(&_predicate_filter_time);
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
- _filter_conjuncts, &filters, block, &result_filter,
&can_filter_all));
- }
+ {
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
+ _filter_conjuncts, &filters, block,
&result_filter, &can_filter_all));
+ }
- if (can_filter_all) {
- for (auto& col : columns_to_filter) {
-
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ if (can_filter_all) {
+ for (auto& col : columns_to_filter) {
+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ Block::erase_useless_column(block, column_to_keep);
+ _convert_dict_cols_to_string_cols(block);
+ return Status::OK();
}
+
+ RETURN_IF_CATCH_EXCEPTION(
+ Block::filter_block_internal(block, columns_to_filter,
result_filter));
Block::erase_useless_column(block, column_to_keep);
- _convert_dict_cols_to_string_cols(block);
- return Status::OK();
+ } else {
+ RETURN_IF_CATCH_EXCEPTION(
+ RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter)));
}
-
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, columns_to_filter,
result_filter));
- Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
- } else {
- RETURN_IF_CATCH_EXCEPTION(
- RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter)));
}
*read_rows = block->rows();
return Status::OK();
@@ -456,49 +457,56 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
- // generate filter vector
- if (_lazy_read_ctx.resize_first_column) {
- // VExprContext.execute has an optimization, the filtering is
executed when block->rows() > 0
- // The following process may be tricky and time-consuming, but we
have no other way.
-
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
- }
- result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
bool can_filter_all = false;
- std::vector<IColumn::Filter*> filters;
- if (_position_delete_ctx.has_filter) {
- filters.push_back(_pos_delete_filter_ptr.get());
- }
-
- VExprContextSPtrs filter_contexts;
- for (auto& conjunct : _filter_conjuncts) {
- filter_contexts.emplace_back(conjunct);
- }
-
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
- RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts,
&filters, block,
- &result_filter,
&can_filter_all));
- }
- if (_lazy_read_ctx.resize_first_column) {
- // We have to clean the first column to insert right data.
- block->get_by_position(0).column->assume_mutable()->clear();
+ // generate filter vector
+ if (_lazy_read_ctx.resize_first_column) {
+ // VExprContext.execute has an optimization, the filtering is
executed when block->rows() > 0
+ // The following process may be tricky and time-consuming, but
we have no other way.
+
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
+ }
+ result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
+ std::vector<IColumn::Filter*> filters;
+ if (_position_delete_ctx.has_filter) {
+ filters.push_back(_pos_delete_filter_ptr.get());
+ }
+
+ VExprContextSPtrs filter_contexts;
+ for (auto& conjunct : _filter_conjuncts) {
+ filter_contexts.emplace_back(conjunct);
+ }
+
+ {
+ SCOPED_RAW_TIMER(&_predicate_filter_time);
+
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters,
block,
+
&result_filter, &can_filter_all));
+ }
+
+ if (_lazy_read_ctx.resize_first_column) {
+ // We have to clean the first column to insert right data.
+ block->get_by_position(0).column->assume_mutable()->clear();
+ }
}
const uint8_t* __restrict filter_map = result_filter.data();
select_vector_ptr.reset(new ColumnSelectVector(filter_map,
pre_read_rows, can_filter_all));
if (select_vector_ptr->filter_all()) {
- for (auto& col : _lazy_read_ctx.predicate_columns.first) {
- // clean block to read predicate columns
- block->get_by_name(col).column->assume_mutable()->clear();
- }
- for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
-
block->get_by_name(col.first).column->assume_mutable()->clear();
- }
- for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
-
block->get_by_name(col.first).column->assume_mutable()->clear();
+ {
+ SCOPED_RAW_TIMER(&_predicate_filter_time);
+ for (auto& col : _lazy_read_ctx.predicate_columns.first) {
+ // clean block to read predicate columns
+ block->get_by_name(col).column->assume_mutable()->clear();
+ }
+ for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
+
block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
+ for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
+
block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
+ Block::erase_useless_column(block, origin_column_num);
}
- Block::erase_useless_column(block, origin_column_num);
if (!pre_eof) {
// If continuous batches are skipped, we can cache them to
skip a whole page
@@ -551,18 +559,21 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// we set pre_read_rows as batch_size for lazy read columns, so pre_eof !=
lazy_eof
// filter data in predicate columns, and remove filter column
- if (select_vector.has_filter()) {
- if (block->columns() == origin_column_num) {
- // the whole row group has been filtered by
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
- // generated from next batch, so the filter column is removed
ahead.
- DCHECK_EQ(block->rows(), 0);
+ {
+ SCOPED_RAW_TIMER(&_predicate_filter_time);
+ if (select_vector.has_filter()) {
+ if (block->columns() == origin_column_num) {
+ // the whole row group has been filtered by
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
+ // generated from next batch, so the filter column is removed
ahead.
+ DCHECK_EQ(block->rows(), 0);
+ } else {
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
+ block, _lazy_read_ctx.all_predicate_col_ids,
result_filter));
+ Block::erase_useless_column(block, origin_column_num);
+ }
} else {
- RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
- block, _lazy_read_ctx.all_predicate_col_ids,
result_filter));
Block::erase_useless_column(block, origin_column_num);
}
- } else {
- Block::erase_useless_column(block, origin_column_num);
}
_convert_dict_cols_to_string_cols(block);
@@ -778,6 +789,7 @@ Status RowGroupReader::_filter_block(Block* block, int
column_to_keep,
}
Status RowGroupReader::_rewrite_dict_predicates() {
+ SCOPED_RAW_TIMER(&_dict_filter_rewrite_time);
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
int slot_id = it->second;
@@ -843,7 +855,6 @@ Status RowGroupReader::_rewrite_dict_predicates() {
IColumn::Filter result_filter(temp_block.rows(), 1);
bool can_filter_all;
{
- SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr,
&temp_block,
&result_filter,
&can_filter_all));
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 2e57c5721e1..bf078f37632 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -158,6 +158,7 @@ public:
Status next_batch(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof);
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows;
}
int64_t predicate_filter_time() const { return _predicate_filter_time; }
+ int64_t dict_filter_rewrite_time() const { return
_dict_filter_rewrite_time; }
ParquetColumnReader::Statistics statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
@@ -213,6 +214,7 @@ private:
const LazyReadContext& _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
+ int64_t _dict_filter_rewrite_time = 0;
// If continuous batches are skipped, we can cache them to skip a whole
page
size_t _cached_filtered_rows = 0;
std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index d14d77f70c2..50c4e7a1dd1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -183,6 +183,8 @@ void ParquetReader::_init_profile() {
_profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile,
1);
_parquet_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime",
parquet_profile, 1);
+ _parquet_profile.dict_filter_rewrite_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime",
parquet_profile, 1);
}
}
@@ -590,6 +592,7 @@ Status ParquetReader::get_next_block(Block* block, size_t*
read_rows, bool* eof)
_column_statistics.merge(column_st);
_statistics.lazy_read_filtered_rows +=
_current_group_reader->lazy_read_filtered_rows();
_statistics.predicate_filter_time +=
_current_group_reader->predicate_filter_time();
+ _statistics.dict_filter_rewrite_time +=
_current_group_reader->dict_filter_rewrite_time();
if (_read_row_groups.size() == 0) {
*eof = true;
} else {
@@ -1034,6 +1037,7 @@ void ParquetReader::_collect_profile() {
COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
_column_statistics.parse_page_header_num);
COUNTER_UPDATE(_parquet_profile.predicate_filter_time,
_statistics.predicate_filter_time);
+ COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time,
_statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_parquet_profile.file_read_time,
_column_statistics.read_time);
COUNTER_UPDATE(_parquet_profile.file_read_calls,
_column_statistics.read_calls);
COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
_column_statistics.meta_read_calls);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 44859e5df9a..33e728e1e29 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -92,6 +92,7 @@ public:
int64_t read_page_index_time = 0;
int64_t parse_page_index_time = 0;
int64_t predicate_filter_time = 0;
+ int64_t dict_filter_rewrite_time = 0;
};
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
@@ -190,6 +191,7 @@ private:
RuntimeProfile::Counter* skip_page_header_num = nullptr;
RuntimeProfile::Counter* parse_page_header_num = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
+ RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
};
Status _open_file();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]