This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8df93f8dfec [Opt](parquet/orc-reader) Opt get dict ids in
_rewrite_dict_predicates() (#40095)
8df93f8dfec is described below
commit 8df93f8dfec1e5a0a697be2147e22913079a4701
Author: Qi Chen <[email protected]>
AuthorDate: Thu Aug 29 14:50:42 2024 +0800
[Opt](parquet/orc-reader) Opt get dict ids in _rewrite_dict_predicates()
(#40095)
## Proposed changes
backport #39893.
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 49 ++++++++--------------
.../format/parquet/byte_array_dict_decoder.cpp | 11 -----
.../exec/format/parquet/byte_array_dict_decoder.h | 4 --
be/src/vec/exec/format/parquet/decoder.h | 5 ---
.../format/parquet/fix_length_dict_decoder.hpp | 14 -------
.../format/parquet/vparquet_column_chunk_reader.h | 5 ---
.../exec/format/parquet/vparquet_column_reader.cpp | 5 ---
.../exec/format/parquet/vparquet_column_reader.h | 7 ----
.../exec/format/parquet/vparquet_group_reader.cpp | 43 ++++++++-----------
9 files changed, 36 insertions(+), 107 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4bc52d76959..d67602d39f6 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -2054,7 +2054,6 @@ Status OrcReader::on_string_dicts_loaded(
orc::StringDictionary* dict =
file_column_name_to_dict_map_iter->second;
std::vector<StringRef> dict_values;
- std::unordered_map<StringRef, int64_t> dict_value_to_code;
size_t max_value_length = 0;
uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
if (dictionaryCount == 0) {
@@ -2074,7 +2073,6 @@ Status OrcReader::on_string_dicts_loaded(
max_value_length = length;
}
dict_values.emplace_back(dict_value);
- dict_value_to_code[dict_value] = i;
}
dict_value_column->insert_many_strings_overflow(&dict_values[0],
dict_values.size(),
max_value_length);
@@ -2113,31 +2111,37 @@ Status OrcReader::on_string_dicts_loaded(
++index;
}
- // 2.2 Execute conjuncts and filter block.
- std::vector<uint32_t> columns_to_filter(1, dict_pos);
- int column_to_keep = temp_block.columns();
+ // 2.2 Execute conjuncts.
if (dict_pos != 0) {
// VExprContext.execute has an optimization, the filtering is
executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we
have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
-
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- ctxs, nullptr, &temp_block, columns_to_filter,
column_to_keep)));
+ IColumn::Filter result_filter(temp_block.rows(), 1);
+ bool can_filter_all;
+ RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr,
&temp_block, &result_filter,
+ &can_filter_all));
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
}
- // Check some conditions.
- ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
- // If dict_column->size() == 0, can filter this stripe.
- if (dict_column->size() == 0) {
+ // If can_filter_all = true, can filter this stripe.
+ if (can_filter_all) {
*is_stripe_filtered = true;
return Status::OK();
}
+ // 3. Get dict codes.
+ std::vector<int32_t> dict_codes;
+ for (size_t i = 0; i < result_filter.size(); ++i) {
+ if (result_filter[i]) {
+ dict_codes.emplace_back(i);
+ }
+ }
+
// About Performance: if dict_column size is too large, it will
generate a large IN filter.
- if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+ if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_non_dict_filter_conjuncts.emplace_back(ctx);
@@ -2145,26 +2149,9 @@ Status OrcReader::on_string_dicts_loaded(
continue;
}
- // 3. Get dict codes.
- std::vector<int32_t> dict_codes;
- if (dict_column->is_nullable()) {
- const ColumnNullable* nullable_column =
- static_cast<const ColumnNullable*>(dict_column.get());
- const ColumnString* nested_column = static_cast<const
ColumnString*>(
- nullable_column->get_nested_column_ptr().get());
- for (int i = 0; i < nested_column->size(); ++i) {
- StringRef dict_value = nested_column->get_data_at(i);
- dict_codes.emplace_back(dict_value_to_code[dict_value]);
- }
- } else {
- for (int i = 0; i < dict_column->size(); ++i) {
- StringRef dict_value = dict_column->get_data_at(i);
- dict_codes.emplace_back(dict_value_to_code[dict_value]);
- }
- }
-
// 4. Rewrite conjuncts.
- RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id,
dict_column->is_nullable()));
+ RETURN_IF_ERROR(_rewrite_dict_conjuncts(
+ dict_codes, slot_id,
temp_block.get_by_position(dict_pos).column->is_nullable()));
++it;
}
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
index b6a614831a3..82bb234fd39 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
@@ -44,7 +44,6 @@ Status
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
total_length += l;
}
- _dict_value_to_code.reserve(num_values);
// For insert_many_strings_overflow
_dict_data.resize(total_length + ColumnString::MAX_STRINGS_OVERFLOW_SIZE);
_max_value_length = 0;
@@ -55,7 +54,6 @@ Status
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
offset_cursor += 4;
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
_dict_items.emplace_back(&_dict_data[offset], l);
- _dict_value_to_code[StringRef(&_dict_data[offset], l)] = i;
offset_cursor += l;
offset += l;
if (offset_cursor > length) {
@@ -77,15 +75,6 @@ Status
ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_
return Status::OK();
}
-Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column,
- std::vector<int32_t>* dict_codes) {
- for (int i = 0; i < string_column->size(); ++i) {
- StringRef dict_value = string_column->get_data_at(i);
- dict_codes->emplace_back(_dict_value_to_code[dict_value]);
- }
- return Status::OK();
-}
-
MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
auto res = ColumnString::create();
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
index 744a62165fb..bb83d41813b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
@@ -54,9 +54,6 @@ public:
Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
- Status get_dict_codes(const ColumnString* column_string,
- std::vector<int32_t>* dict_codes) override;
-
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
protected:
@@ -64,6 +61,5 @@ protected:
std::vector<StringRef> _dict_items;
std::vector<uint8_t> _dict_data;
size_t _max_value_length;
- std::unordered_map<StringRef, int32_t> _dict_value_to_code;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/decoder.h
b/be/src/vec/exec/format/parquet/decoder.h
index 57fecf4abfb..1654878af80 100644
--- a/be/src/vec/exec/format/parquet/decoder.h
+++ b/be/src/vec/exec/format/parquet/decoder.h
@@ -78,11 +78,6 @@ public:
return Status::NotSupported("read_dict_values_to_column is not
supported");
}
- virtual Status get_dict_codes(const ColumnString* column_string,
- std::vector<int32_t>* dict_codes) {
- return Status::NotSupported("get_dict_codes is not supported");
- }
-
virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
LOG(FATAL) << "Method convert_dict_column_to_string_column is not
supported";
__builtin_unreachable();
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index e409c664d3e..2886696877f 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -109,10 +109,8 @@ protected:
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
- _dict_value_to_code.reserve(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = dict_item_address;
- _dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i;
dict_item_address += _type_length;
}
return Status::OK();
@@ -128,17 +126,6 @@ protected:
return Status::OK();
}
- Status get_dict_codes(const ColumnString* string_column,
- std::vector<int32_t>* dict_codes) override {
- size_t size = string_column->size();
- dict_codes->reserve(size);
- for (int i = 0; i < size; ++i) {
- StringRef dict_value = string_column->get_data_at(i);
- dict_codes->emplace_back(_dict_value_to_code[dict_value]);
- }
- return Status::OK();
- }
-
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override {
auto res = ColumnString::create();
std::vector<StringRef> dict_values(dict_column->size());
@@ -149,7 +136,6 @@ protected:
res->insert_many_strings(&dict_values[0], dict_values.size());
return res;
}
- std::unordered_map<StringRef, int32_t> _dict_value_to_code;
// For dictionary encoding
std::vector<char*> _dict_items;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 79ee3cd6463..a00a4683725 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -183,11 +183,6 @@ public:
->read_dict_values_to_column(doris_column);
}
- Status get_dict_codes(const ColumnString* column_string,
std::vector<int32_t>* dict_codes) {
- return
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes(
- column_string, dict_codes);
- }
-
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) {
return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
->convert_dict_column_to_string_column(dict_column);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index c31c63ee87c..9c368b6a7a6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -454,11 +454,6 @@ Status
ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& doris_co
return Status::OK();
}
-Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string,
- std::vector<int32_t>* dict_codes) {
- return _chunk_reader->get_dict_codes(column_string, dict_codes);
-}
-
MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
return _chunk_reader->convert_dict_column_to_string_column(dict_column);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index f0eadb8bcd6..4c6e5b1eac9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -128,11 +128,6 @@ public:
return Status::NotSupported("read_dict_values_to_column is not
supported");
}
- virtual Status get_dict_codes(const ColumnString* column_string,
- std::vector<int32_t>* dict_codes) {
- return Status::NotSupported("get_dict_codes is not supported");
- }
-
virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
LOG(FATAL) << "Method convert_dict_column_to_string_column is not
supported";
__builtin_unreachable();
@@ -180,8 +175,6 @@ public:
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
bool* eof, bool is_dict_filter) override;
Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool*
has_dict) override;
- Status get_dict_codes(const ColumnString* column_string,
- std::vector<int32_t>* dict_codes) override;
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
const std::vector<level_t>& get_rep_level() const override { return
_rep_levels; }
const std::vector<level_t>& get_def_level() const override { return
_def_levels; }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b70beec687b..4e7f53ca75e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -841,7 +841,7 @@ Status RowGroupReader::_rewrite_dict_predicates() {
++index;
}
- // 2.2 Execute conjuncts and filter block.
+ // 2.2 Execute conjuncts.
VExprContextSPtrs ctxs;
auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
if (iter != _slot_id_to_filter_conjuncts->end()) {
@@ -854,33 +854,39 @@ Status RowGroupReader::_rewrite_dict_predicates() {
return Status::NotFound(msg.str());
}
- std::vector<uint32_t> columns_to_filter(1, dict_pos);
- int column_to_keep = temp_block.columns();
if (dict_pos != 0) {
// VExprContext.execute has an optimization, the filtering is
executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we
have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
+ IColumn::Filter result_filter(temp_block.rows(), 1);
+ bool can_filter_all;
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
- ctxs, nullptr, &temp_block, columns_to_filter,
column_to_keep));
+ RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr,
&temp_block,
+ &result_filter,
&can_filter_all));
}
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
}
- // Check some conditions.
- ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
- // If dict_column->size() == 0, can filter this row group.
- if (dict_column->size() == 0) {
+ // If can_filter_all = true, can filter this row group.
+ if (can_filter_all) {
_is_row_group_filtered = true;
return Status::OK();
}
+ // 3. Get dict codes.
+ std::vector<int32_t> dict_codes;
+ for (size_t i = 0; i < result_filter.size(); ++i) {
+ if (result_filter[i]) {
+ dict_codes.emplace_back(i);
+ }
+ }
+
// About Performance: if dict_column size is too large, it will
generate a large IN filter.
- if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+ if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_filter_conjuncts.push_back(ctx);
@@ -888,22 +894,9 @@ Status RowGroupReader::_rewrite_dict_predicates() {
continue;
}
- // 3. Get dict codes.
- std::vector<int32_t> dict_codes;
- if (dict_column->is_nullable()) {
- const ColumnNullable* nullable_column =
- static_cast<const ColumnNullable*>(dict_column.get());
- const ColumnString* nested_column = static_cast<const
ColumnString*>(
- nullable_column->get_nested_column_ptr().get());
-
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
- assert_cast<const ColumnString*>(nested_column),
&dict_codes));
- } else {
-
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
- assert_cast<const ColumnString*>(dict_column.get()),
&dict_codes));
- }
-
// 4. Rewrite conjuncts.
- RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id,
dict_column->is_nullable()));
+ RETURN_IF_ERROR(_rewrite_dict_conjuncts(
+ dict_codes, slot_id,
temp_block.get_by_position(dict_pos).column->is_nullable()));
++it;
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]