This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new fc7953bd0d4 [Opt](orc-reader) Optimize orc string dict filter in 
not_single_conjunct case. (#26386) (#26696)
fc7953bd0d4 is described below

commit fc7953bd0d4cc476976d550417dcde311417b3d4
Author: Qi Chen <[email protected]>
AuthorDate: Fri Nov 10 12:08:37 2023 +0800

    [Opt](orc-reader) Optimize orc string dict filter in not_single_conjunct 
case. (#26386) (#26696)
    
    Optimize orc/parquet string dict filter in not_single_conjunct case. We can 
optimize this processing to filter block firstly by dict code, then filter by 
not_single_conjunct. Because dict code is int, it will filter faster than 
string.
    
    For example:
    ```
    select count(l_receiptdate) from lineitem_date_as_string where l_shipmode 
in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate  and l_receiptdate >= 
'1994-01-01' and l_receiptdate < '1995-01-01';
    ```
     `l_receiptdate` and `l_shipmode` will using string dict filtering, and 
`l_commitdate < l_receiptdate` is the an not_single_conjunct which contains 
dict filter field. We can optimize this processing to filter block firstly by 
dict code, then filter by not_single_conjunct. Because dict code is int, it 
will filter faster than string.
    
    Before:
     mysql> select count(l_receiptdate) from lineitem_date_as_string where 
l_shipmode in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate  and 
l_receiptdate >= '1994-01-01' and l_receiptdate < '1995-01-01';
    +----------------------+
    | count(l_receiptdate) |
    +----------------------+
    |             49314694 |
    +----------------------+
    1 row in set (6.87 sec)
    
    After:
    mysql> select count(l_receiptdate) from lineitem_date_as_string where 
l_shipmode in ('MAIL', 'SHIP') and l_commitdate < l_receiptdate  and 
l_receiptdate >= '1994-01-01' and l_receiptdate < '1995-01-01';
    +----------------------+
    | count(l_receiptdate) |
    +----------------------+
    |             49314694 |
    +----------------------+
    1 row in set (4.85 sec)
---
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 46 +++++++++++-----------
 be/src/vec/exec/format/orc/vorc_reader.h           |  1 +
 .../exec/format/parquet/vparquet_group_reader.cpp  | 11 ++----
 3 files changed, 29 insertions(+), 29 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index b7399300b63..08baaf242eb 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -146,7 +146,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* 
state,
           _ctz(ctz),
           _is_hive(params.__isset.slot_name_to_schema_pos),
           _io_ctx(io_ctx),
-          _enable_lazy_mat(enable_lazy_mat) {
+          _enable_lazy_mat(enable_lazy_mat),
+          _is_dict_cols_converted(false) {
     TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
     VecDateTimeValue t;
     t.from_unixtime(0, ctz);
@@ -165,7 +166,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
           _is_hive(params.__isset.slot_name_to_schema_pos),
           _file_system(nullptr),
           _io_ctx(io_ctx),
-          _enable_lazy_mat(enable_lazy_mat) {
+          _enable_lazy_mat(enable_lazy_mat),
+          _is_dict_cols_converted(false) {
     _init_system_properties();
     _init_file_description();
 }
@@ -1426,7 +1428,6 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     col_name, column_ptr, column_type, 
_col_orc_type[orc_col_idx->second],
                     batch_vec[orc_col_idx->second], _batch->numElements));
         }
-        *read_rows = rr;
 
         RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
                                                 
_lazy_read_ctx.partition_columns));
@@ -1434,22 +1435,24 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 _fill_missing_columns(block, _batch->numElements, 
_lazy_read_ctx.missing_columns));
 
         if (block->rows() == 0) {
+            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
nullptr));
             *eof = true;
+            *read_rows = 0;
             return Status::OK();
         }
 
+        RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, 
columns_to_filter, *_filter));
         if (!_not_single_slot_filter_conjuncts.empty()) {
-            std::vector<IColumn::Filter*> filters;
-            filters.push_back(_filter.get());
+            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
             RETURN_IF_CATCH_EXCEPTION(
                     
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                            _not_single_slot_filter_conjuncts, &filters, 
block, columns_to_filter,
+                            _not_single_slot_filter_conjuncts, nullptr, block, 
columns_to_filter,
                             column_to_keep)));
         } else {
-            RETURN_IF_CATCH_EXCEPTION(
-                    Block::filter_block_internal(block, columns_to_filter, 
*_filter));
             Block::erase_useless_column(block, column_to_keep);
+            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
         }
+        *read_rows = block->rows();
     } else {
         uint64_t rr;
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1486,6 +1489,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 block->replace_by_position(pos, std::move(dict_col_ptr));
             }
         }
+        _is_dict_cols_converted = true;
 
         std::vector<orc::ColumnVectorBatch*> batch_vec;
         _fill_batch_vec(batch_vec, _batch.get(), 0);
@@ -1502,7 +1506,6 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     col_name, column_ptr, column_type, 
_col_orc_type[orc_col_idx->second],
                     batch_vec[orc_col_idx->second], _batch->numElements));
         }
-        *read_rows = rr;
 
         RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
                                                 
_lazy_read_ctx.partition_columns));
@@ -1512,6 +1515,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
         if (block->rows() == 0) {
             _convert_dict_cols_to_string_cols(block, nullptr);
             *eof = true;
+            *read_rows = 0;
             return Status::OK();
         }
 
@@ -1549,19 +1553,17 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 _convert_dict_cols_to_string_cols(block, &batch_vec);
                 return Status::OK();
             }
+            RETURN_IF_CATCH_EXCEPTION(
+                    Block::filter_block_internal(block, columns_to_filter, 
result_filter));
             if (!_not_single_slot_filter_conjuncts.empty()) {
-                _convert_dict_cols_to_string_cols(block, &batch_vec);
-                std::vector<IColumn::Filter*> merged_filters;
-                merged_filters.push_back(&result_filter);
+                static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
                 RETURN_IF_CATCH_EXCEPTION(
                         
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                                _not_single_slot_filter_conjuncts, 
&merged_filters, block,
+                                _not_single_slot_filter_conjuncts, nullptr, 
block,
                                 columns_to_filter, column_to_keep)));
             } else {
-                RETURN_IF_CATCH_EXCEPTION(
-                        Block::filter_block_internal(block, columns_to_filter, 
result_filter));
                 Block::erase_useless_column(block, column_to_keep);
-                _convert_dict_cols_to_string_cols(block, &batch_vec);
+                static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
             }
         } else {
             if (_delete_rows_filter_ptr) {
@@ -1569,8 +1571,9 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                                                                        
(*_delete_rows_filter_ptr)));
             }
             Block::erase_useless_column(block, column_to_keep);
-            _convert_dict_cols_to_string_cols(block, &batch_vec);
+            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
         }
+        *read_rows = block->rows();
     }
     return Status::OK();
 }
@@ -1632,6 +1635,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
             block->replace_by_position(pos, std::move(dict_col_ptr));
         }
     }
+    _is_dict_cols_converted = true;
     std::vector<orc::ColumnVectorBatch*> batch_vec;
     _fill_batch_vec(batch_vec, &data, 0);
     std::vector<string> col_names;
@@ -1710,11 +1714,6 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
         new_size += result_filter_data[i] ? 1 : 0;
     }
     data.numElements = new_size;
-    if (data.numElements > 0) {
-        _convert_dict_cols_to_string_cols(block, &batch_vec);
-    } else {
-        _convert_dict_cols_to_string_cols(block, nullptr);
-    }
     return Status::OK();
 }
 
@@ -2035,6 +2034,9 @@ Status 
OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int
 
 Status OrcReader::_convert_dict_cols_to_string_cols(
         Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
+    if (!_is_dict_cols_converted) {
+        return Status::OK();
+    }
     for (auto& dict_filter_cols : _dict_filter_cols) {
         size_t pos = block->get_position_by_name(dict_filter_cols.first);
         ColumnWithTypeAndName& column_with_type_and_name = 
block->get_by_position(pos);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0d9fb3df29d..4d1e6d74bb8 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -557,6 +557,7 @@ private:
     std::vector<std::pair<std::string, int>> _dict_filter_cols;
     std::shared_ptr<ObjectPool> _obj_pool;
     std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
+    bool _is_dict_cols_converted;
 };
 
 class ORCFileInputStream : public orc::InputStream {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 661fcd24f64..a4f77e2c056 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -345,17 +345,16 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
                 _convert_dict_cols_to_string_cols(block);
                 return Status::OK();
             }
+
+            RETURN_IF_CATCH_EXCEPTION(
+                    Block::filter_block_internal(block, columns_to_filter, 
result_filter));
             if (!_not_single_slot_filter_conjuncts.empty()) {
                 _convert_dict_cols_to_string_cols(block);
-                std::vector<IColumn::Filter*> merged_filters;
-                merged_filters.push_back(&result_filter);
                 RETURN_IF_CATCH_EXCEPTION(
                         
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                                _not_single_slot_filter_conjuncts, 
&merged_filters, block,
+                                _not_single_slot_filter_conjuncts, nullptr, 
block,
                                 columns_to_filter, column_to_keep)));
             } else {
-                RETURN_IF_CATCH_EXCEPTION(
-                        Block::filter_block_internal(block, columns_to_filter, 
result_filter));
                 Block::erase_useless_column(block, column_to_keep);
                 _convert_dict_cols_to_string_cols(block);
             }
@@ -573,8 +572,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     RETURN_IF_ERROR(_fill_partition_columns(block, column_size, 
_lazy_read_ctx.partition_columns));
     RETURN_IF_ERROR(_fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns));
     if (!_not_single_slot_filter_conjuncts.empty()) {
-        std::vector<IColumn::Filter*> filters;
-        filters.push_back(&result_filter);
         
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
                 _not_single_slot_filter_conjuncts, nullptr, block, 
columns_to_filter,
                 origin_column_num)));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to