This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b4b126b817 [Feature](parquet-reader) Implements dict filter
functionality parquet reader. (#17594)
b4b126b817 is described below
commit b4b126b817a78609354a1b1c6de75208d21c3479
Author: Qi Chen <[email protected]>
AuthorDate: Thu Mar 16 20:29:27 2023 +0800
[Feature](parquet-reader) Implements dict filter functionality parquet
reader. (#17594)
Implements dict filter functionality parquet reader to improve performance.
---
be/src/service/internal_service.cpp | 2 +-
be/src/vec/data_types/data_type.cpp | 1 +
.../vec/exec/format/parquet/bool_plain_decoder.cpp | 2 +-
.../vec/exec/format/parquet/bool_plain_decoder.h | 2 +-
.../vec/exec/format/parquet/bool_rle_decoder.cpp | 2 +-
be/src/vec/exec/format/parquet/bool_rle_decoder.h | 2 +-
.../format/parquet/byte_array_dict_decoder.cpp | 44 +-
.../exec/format/parquet/byte_array_dict_decoder.h | 10 +-
.../format/parquet/byte_array_plain_decoder.cpp | 3 +-
.../exec/format/parquet/byte_array_plain_decoder.h | 2 +-
be/src/vec/exec/format/parquet/decoder.h | 26 +-
.../exec/format/parquet/delta_bit_pack_decoder.h | 9 +-
.../format/parquet/fix_length_dict_decoder.hpp | 59 +-
.../format/parquet/fix_length_plain_decoder.cpp | 3 +-
.../exec/format/parquet/fix_length_plain_decoder.h | 2 +-
.../parquet/vparquet_column_chunk_reader.cpp | 9 +-
.../format/parquet/vparquet_column_chunk_reader.h | 30 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 66 ++-
.../exec/format/parquet/vparquet_column_reader.h | 33 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 641 ++++++++++++++++-----
.../exec/format/parquet/vparquet_group_reader.h | 44 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 33 +-
be/src/vec/exec/format/parquet/vparquet_reader.h | 17 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 15 +-
be/src/vec/exec/format/table/iceberg_reader.h | 6 +-
be/src/vec/exec/scan/new_file_scan_node.cpp | 3 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 86 ++-
be/src/vec/exec/scan/vfile_scanner.h | 15 +-
be/src/vec/exec/scan/vscan_node.cpp | 2 +
be/src/vec/exec/scan/vscan_node.h | 2 +
be/test/vec/exec/parquet/parquet_reader_test.cpp | 6 +-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 14 +-
32 files changed, 957 insertions(+), 234 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 407ad5dad0..257509a97c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -480,7 +480,7 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
break;
}
case TFileFormatType::FORMAT_PARQUET: {
- reader.reset(new vectorized::ParquetReader(params, range,
&io_ctx));
+ reader.reset(new vectorized::ParquetReader(params, range, &io_ctx,
nullptr));
break;
}
case TFileFormatType::FORMAT_ORC: {
diff --git a/be/src/vec/data_types/data_type.cpp
b/be/src/vec/data_types/data_type.cpp
index 6d70850564..aee0630585 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -56,6 +56,7 @@ void IDataType::update_avg_value_size_hint(const IColumn&
column, double& avg_va
ColumnPtr IDataType::create_column_const(size_t size, const Field& field)
const {
auto column = create_column();
+ column->reserve(1);
column->insert(field);
return ColumnConst::create(std::move(column), size);
}
diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
index 87e6a31828..ff05c559a5 100644
--- a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
@@ -43,7 +43,7 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
}
Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector) {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) {
auto& column_data =
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.h
b/be/src/vec/exec/format/parquet/bool_plain_decoder.h
index 77ab7f4ccd..08e54ecd60 100644
--- a/be/src/vec/exec/format/parquet/bool_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.h
@@ -37,7 +37,7 @@ public:
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override;
Status skip_values(size_t num_values) override;
diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
index 563b6c68df..0856687bbf 100644
--- a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
@@ -46,7 +46,7 @@ Status BoolRLEDecoder::skip_values(size_t num_values) {
}
Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector) {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) {
auto& column_data =
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.h
b/be/src/vec/exec/format/parquet/bool_rle_decoder.h
index 0b3ba6e05d..6b0c94825f 100644
--- a/be/src/vec/exec/format/parquet/bool_rle_decoder.h
+++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.h
@@ -30,7 +30,7 @@ public:
void set_data(Slice* slice) override;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override;
Status skip_values(size_t num_values) override;
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
index e5a86c5061..6962b2a47b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
@@ -38,6 +38,7 @@ Status
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
total_length += l;
}
+ _dict_value_to_code.reserve(num_values);
// For insert_many_strings_overflow
_dict_data.resize(total_length + MAX_STRINGS_OVERFLOW_SIZE);
_max_value_length = 0;
@@ -48,6 +49,7 @@ Status
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
offset_cursor += 4;
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
_dict_items.emplace_back(&_dict_data[offset], l);
+ _dict_value_to_code[StringRef(&_dict_data[offset], l)] = i;
offset_cursor += l;
offset += l;
if (offset_cursor > length) {
@@ -63,19 +65,47 @@ Status
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
return Status::OK();
}
+Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr&
doris_column) {
+ doris_column->insert_many_strings_overflow(&_dict_items[0],
_dict_items.size(),
+ _max_value_length);
+ return Status::OK();
+}
+
+Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column,
+ std::vector<int32_t>* dict_codes) {
+ for (int i = 0; i < string_column->size(); ++i) {
+ StringRef dict_value = string_column->get_data_at(i);
+ dict_codes->emplace_back(_dict_value_to_code[dict_value]);
+ }
+ return Status::OK();
+}
+
+MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) {
+ auto res = ColumnString::create();
+ std::vector<StringRef> dict_values(dict_column->size());
+ const auto& data = dict_column->get_data();
+ for (size_t i = 0; i < dict_column->size(); ++i) {
+ dict_values[i] = _dict_items[data[i]];
+ }
+ res->insert_many_strings_overflow(&dict_values[0], dict_values.size(),
_max_value_length);
+ return res;
+}
+
Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector) {
+ ColumnSelectVector& select_vector,
bool is_dict_filter) {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
- if (doris_column->is_column_dictionary() &&
- assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
- assert_cast<ColumnDictI32&>(*doris_column)
- .insert_many_dict_data(&_dict_items[0], _dict_items.size());
+ if (doris_column->is_column_dictionary()) {
+ ColumnDictI32& dict_column =
assert_cast<ColumnDictI32&>(*doris_column);
+ if (dict_column.dict_size() == 0) {
+ dict_column.insert_many_dict_data(&_dict_items[0],
_dict_items.size());
+ }
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
- if (doris_column->is_column_dictionary()) {
- return _decode_dict_values(doris_column, select_vector);
+ if (doris_column->is_column_dictionary() || is_dict_filter) {
+ return _decode_dict_values(doris_column, select_vector,
is_dict_filter);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
index a90226d7c3..8492937e83 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
@@ -31,10 +31,17 @@ public:
~ByteArrayDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t
num_values) override;
+ Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
+
+ Status get_dict_codes(const ColumnString* column_string,
+ std::vector<int32_t>* dict_codes) override;
+
+ MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
+
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
@@ -44,6 +51,7 @@ protected:
std::vector<StringRef> _dict_items;
std::vector<uint8_t> _dict_data;
size_t _max_value_length;
+ std::unordered_map<StringRef, int32_t> _dict_value_to_code;
};
template <typename DecimalPrimitiveType>
diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
index acc977dbe5..833472665b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
@@ -38,7 +38,8 @@ Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
}
Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector)
{
+ ColumnSelectVector& select_vector,
+ bool is_dict_filter) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
index 84bed0dec3..cd35ceaa6f 100644
--- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
@@ -30,7 +30,7 @@ public:
~ByteArrayPlainDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override;
Status skip_values(size_t num_values) override;
diff --git a/be/src/vec/exec/format/parquet/decoder.h
b/be/src/vec/exec/format/parquet/decoder.h
index 827e377c2b..eb0d88bd63 100644
--- a/be/src/vec/exec/format/parquet/decoder.h
+++ b/be/src/vec/exec/format/parquet/decoder.h
@@ -76,7 +76,7 @@ public:
// Write the decoded values batch to doris's column
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) = 0;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) = 0;
virtual Status skip_values(size_t num_values) = 0;
@@ -84,6 +84,19 @@ public:
return Status::NotSupported("set_dict is not supported");
}
+ virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
+ return Status::NotSupported("read_dict_values_to_column is not
supported");
+ }
+
+ virtual Status get_dict_codes(const ColumnString* column_string,
+ std::vector<int32_t>* dict_codes) {
+ return Status::NotSupported("get_dict_codes is not supported");
+ }
+
+ virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
+ LOG(FATAL) << "Method convert_dict_column_to_string_column is not
supported";
+ }
+
protected:
int32_t _type_length;
Slice* _data = nullptr;
@@ -136,11 +149,15 @@ protected:
* Decode dictionary-coded values into doris_column, ensure that
doris_column is ColumnDictI32 type,
* and the coded values must be read into _indexes previously.
*/
- Status _decode_dict_values(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
- DCHECK(doris_column->is_column_dictionary());
+ Status _decode_dict_values(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector,
+ bool is_dict_filter) {
+ DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- auto& column_data =
assert_cast<ColumnDictI32&>(*doris_column).get_data();
+ PaddedPODArray<Int32>& column_data =
+ doris_column->is_column_dictionary()
+ ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
+ : assert_cast<ColumnInt32&>(*doris_column).get_data();
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
@@ -171,7 +188,6 @@ protected:
return Status::OK();
}
-protected:
// For dictionary encoding
std::unique_ptr<uint8_t[]> _dict = nullptr;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
index bdebd4d82f..f51941bb3a 100644
--- a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
+++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
@@ -64,7 +64,7 @@ public:
: DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {}
~DeltaBitPackDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
// decode values
_values.resize(non_null_size);
@@ -75,7 +75,8 @@ public:
_data->size = _values.size() * _type_length;
// set decoded value with fix plain decoder
init_values_converter();
- return _type_converted_decoder->decode_values(doris_column, data_type,
select_vector);
+ return _type_converted_decoder->decode_values(doris_column, data_type,
select_vector,
+ is_dict_filter);
}
Status decode(T* buffer, int num_values, int* out_num_values) {
@@ -155,7 +156,7 @@ public:
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override {
size_t num_values = select_vector.num_values();
size_t null_count = select_vector.num_nulls();
// init read buffer
@@ -222,7 +223,7 @@ public:
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override {
size_t num_values = select_vector.num_values();
size_t null_count = select_vector.num_nulls();
_values.resize(num_values - null_count);
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index e581a65d10..64daf9325a 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -20,6 +20,7 @@
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_nullable.h"
+#include "vec/exec/format/parquet/decoder.h"
namespace doris::vectorized {
@@ -31,7 +32,7 @@ public:
~FixLengthDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
@@ -43,11 +44,22 @@ public:
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&dict_items[0], dict_items.size());
}
+ if (doris_column->is_column_dictionary()) {
+ ColumnDictI32& dict_column =
assert_cast<ColumnDictI32&>(*doris_column);
+ if (dict_column.dict_size() == 0) {
+ std::vector<StringRef> dict_items;
+ dict_items.reserve(_dict_items.size());
+ for (int i = 0; i < _dict_items.size(); ++i) {
+ dict_items.emplace_back((char*)(&_dict_items[i]),
_type_length);
+ }
+ dict_column.insert_many_dict_data(&dict_items[0],
dict_items.size());
+ }
+ }
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
- if (doris_column->is_column_dictionary()) {
- return _decode_dict_values(doris_column, select_vector);
+ if (doris_column->is_column_dictionary() || is_dict_filter) {
+ return _decode_dict_values(doris_column, select_vector,
is_dict_filter);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -364,7 +376,7 @@ public:
~FixLengthDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override {
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
@@ -379,8 +391,8 @@ public:
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
- if (doris_column->is_column_dictionary()) {
- return _decode_dict_values(doris_column, select_vector);
+ if (doris_column->is_column_dictionary() || is_dict_filter) {
+ return _decode_dict_values(doris_column, select_vector,
is_dict_filter);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -434,13 +446,47 @@ public:
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
+ _dict_value_to_code.reserve(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = dict_item_address;
+ _dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i;
dict_item_address += _type_length;
}
return Status::OK();
}
+ Status read_dict_values_to_column(MutableColumnPtr& doris_column) override
{
+ size_t dict_items_size = _dict_items.size();
+ std::vector<StringRef> dict_values(dict_items_size);
+ for (size_t i = 0; i < dict_items_size; ++i) {
+ dict_values.emplace_back(_dict_items[i], _type_length);
+ }
+ doris_column->insert_many_strings(&dict_values[0], dict_items_size);
+ return Status::OK();
+ }
+
+ Status get_dict_codes(const ColumnString* string_column,
+ std::vector<int32_t>* dict_codes) override {
+ size_t size = string_column->size();
+ dict_codes->reserve(size);
+ for (int i = 0; i < size; ++i) {
+ StringRef dict_value = string_column->get_data_at(i);
+ dict_codes->emplace_back(_dict_value_to_code[dict_value]);
+ }
+ return Status::OK();
+ }
+
+ MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override {
+ auto res = ColumnString::create();
+ std::vector<StringRef> dict_values(dict_column->size());
+ const auto& data = dict_column->get_data();
+ for (size_t i = 0; i < dict_column->size(); ++i) {
+ dict_values.emplace_back(_dict_items[data[i]], _type_length);
+ }
+ res->insert_many_strings(&dict_values[0], dict_values.size());
+ return res;
+ }
+
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
@@ -528,6 +574,7 @@ protected:
// For dictionary encoding
std::vector<char*> _dict_items;
+ std::unordered_map<StringRef, int32_t> _dict_value_to_code;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
index e50fcc627b..01bfe67981 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
@@ -32,7 +32,8 @@ Status FixLengthPlainDecoder::skip_values(size_t num_values) {
}
Status FixLengthPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector)
{
+ ColumnSelectVector& select_vector,
+ bool is_dict_filter) {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
index b8f516444e..c35a97fc3e 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
@@ -33,7 +33,7 @@ public:
~FixLengthPlainDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector) override;
+ ColumnSelectVector& select_vector, bool
is_dict_filter) override;
Status skip_values(size_t num_values) override;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index d53023c1a2..9308624246 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -52,6 +52,9 @@ Status ColumnChunkReader::init() {
}
Status ColumnChunkReader::next_page() {
+ if (_state == HEADER_PARSED) {
+ return Status::OK();
+ }
if (UNLIKELY(_state == NOT_INIT)) {
return Status::Corruption("Should initialize chunk reader");
}
@@ -258,16 +261,16 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels,
size_t n) {
}
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- ColumnSelectVector& select_vector) {
+ ColumnSelectVector& select_vector,
bool is_dict_filter) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
- if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) {
+ if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) &&
!_has_dict)) {
return Status::IOError("Not dictionary coded");
}
if (UNLIKELY(_remaining_num_values < select_vector.num_values())) {
return Status::IOError("Decode too many values in current page");
}
_remaining_num_values -= select_vector.num_values();
- return _page_decoder->decode_values(doris_column, data_type,
select_vector);
+ return _page_decoder->decode_values(doris_column, data_type,
select_vector, is_dict_filter);
}
int32_t ColumnChunkReader::_get_type_length() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 2bf21fbf7b..303af52104 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -86,14 +86,13 @@ public:
// Skip current page(will not read and parse) if the page is filtered by
predicates.
Status skip_page() {
+ Status res = Status::OK();
_remaining_num_values = 0;
if (_state == HEADER_PARSED) {
- return _page_reader->skip_page();
+ res = _page_reader->skip_page();
}
- if (_state != DATA_LOADED) {
- return Status::Corruption("Should parse page header to skip page");
- }
- return Status::OK();
+ _state = PAGE_SKIPPED;
+ return res;
}
// Skip some values(will not read and parse) in current page if the values
are filtered by predicates.
// when skip_data = false, the underlying decoder will not skip data,
@@ -124,7 +123,7 @@ public:
// Decode values in current page into doris column.
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
- ColumnSelectVector& select_vector);
+ ColumnSelectVector& select_vector, bool
is_dict_filter);
// Get the repetition level decoder of current page.
LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
@@ -134,6 +133,8 @@ public:
level_t max_rep_level() const { return _max_rep_level; }
level_t max_def_level() const { return _max_def_level; }
+ bool has_dict() const { return _has_dict; };
+
// Get page decoder
Decoder* get_page_decoder() { return _page_decoder; }
@@ -142,8 +143,23 @@ public:
return _statistics;
}
+ Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
+ return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
+ ->read_dict_values_to_column(doris_column);
+ }
+
+ Status get_dict_codes(const ColumnString* column_string,
std::vector<int32_t>* dict_codes) {
+ return
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes(
+ column_string, dict_codes);
+ }
+
+ MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) {
+ return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
+ ->convert_dict_column_to_string_column(dict_column);
+ }
+
private:
- enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED,
DATA_LOADED };
+ enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED,
DATA_LOADED, PAGE_SKIPPED };
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 3e94a3082f..0357c77b22 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -214,7 +214,8 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
}
Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr&
doris_column,
- DataTypePtr& type, ColumnSelectVector&
select_vector) {
+ DataTypePtr& type, ColumnSelectVector&
select_vector,
+ bool is_dict_filter) {
if (num_values == 0) {
return Status::OK();
}
@@ -271,12 +272,12 @@ Status ScalarColumnReader::_read_values(size_t
num_values, ColumnPtr& doris_colu
SCOPED_RAW_TIMER(&_decode_null_map_time);
select_vector.set_run_length_null_map(null_map, num_values,
map_data_column);
}
- return _chunk_reader->decode_values(data_column, type, select_vector);
+ return _chunk_reader->decode_values(data_column, type, select_vector,
is_dict_filter);
}
Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column,
DataTypePtr& type,
ColumnSelectVector&
select_vector, size_t batch_size,
- size_t* read_rows, bool* eof) {
+ size_t* read_rows, bool* eof,
bool is_dict_filter) {
_rep_levels.resize(0);
_def_levels.resize(0);
size_t parsed_rows = 0;
@@ -373,7 +374,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr&
doris_column, DataType
SCOPED_RAW_TIMER(&_decode_null_map_time);
select_vector.set_run_length_null_map(null_map, num_values,
map_data_column);
}
- RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type,
select_vector));
+ RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type,
select_vector, is_dict_filter));
if (ancestor_nulls != 0) {
_chunk_reader->skip_values(ancestor_nulls, false);
}
@@ -384,10 +385,44 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr&
doris_column, DataType
}
return Status::OK();
}
+Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr&
doris_column,
+ bool* has_dict) {
+ bool loaded;
+ RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
+ if (loaded && has_dict) {
+ return _chunk_reader->read_dict_values_to_column(doris_column);
+ }
+ return Status::OK();
+}
+
+Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string,
+ std::vector<int32_t>* dict_codes) {
+ return _chunk_reader->get_dict_codes(column_string, dict_codes);
+}
+
+MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) {
+ return _chunk_reader->convert_dict_column_to_string_column(dict_column);
+}
+
+Status ScalarColumnReader::_try_load_dict_page(bool* loaded, bool* has_dict) {
+ *loaded = false;
+ *has_dict = false;
+ if (_chunk_reader->remaining_num_values() == 0) {
+ if (!_chunk_reader->has_next_page()) {
+ *loaded = false;
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(_chunk_reader->next_page());
+ *loaded = true;
+ *has_dict = _chunk_reader->has_dict();
+ }
+ return Status::OK();
+}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
ColumnSelectVector& select_vector,
size_t batch_size,
- size_t* read_rows, bool* eof) {
+ size_t* read_rows, bool* eof, bool
is_dict_filter) {
if (_chunk_reader->remaining_num_values() == 0) {
if (!_chunk_reader->has_next_page()) {
*eof = true;
@@ -398,7 +433,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
}
if (_nested_column) {
RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
- return _read_nested_column(doris_column, type, select_vector,
batch_size, read_rows, eof);
+ return _read_nested_column(doris_column, type, select_vector,
batch_size, read_rows, eof,
+ is_dict_filter);
}
// generate the row ranges that should be read
@@ -452,7 +488,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
if (skip_whole_batch) {
RETURN_IF_ERROR(_skip_values(read_values));
} else {
- RETURN_IF_ERROR(_read_values(read_values, doris_column, type,
select_vector));
+ RETURN_IF_ERROR(_read_values(read_values, doris_column, type,
select_vector,
+ is_dict_filter));
}
has_read += read_values;
_current_row_index += read_values;
@@ -478,7 +515,7 @@ Status
ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read
Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
ColumnSelectVector& select_vector,
size_t batch_size,
- size_t* read_rows, bool* eof) {
+ size_t* read_rows, bool* eof, bool
is_dict_filter) {
MutableColumnPtr data_column;
NullMap* null_map_ptr = nullptr;
if (doris_column->is_nullable()) {
@@ -499,7 +536,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr&
const_cast<DataTypePtr&>(
(reinterpret_cast<const
DataTypeArray*>(remove_nullable(type).get()))
->get_nested_type()),
- select_vector, batch_size, read_rows, eof));
+ select_vector, batch_size, read_rows, eof, is_dict_filter));
if (*read_rows == 0) {
return Status::OK();
}
@@ -523,7 +560,7 @@ Status
MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
type,
ColumnSelectVector& select_vector,
size_t batch_size,
- size_t* read_rows, bool* eof) {
+ size_t* read_rows, bool* eof, bool
is_dict_filter) {
MutableColumnPtr data_column;
NullMap* null_map_ptr = nullptr;
if (doris_column->is_nullable()) {
@@ -553,10 +590,11 @@ Status MapColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr& t
bool key_eof = false;
bool value_eof = false;
RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type,
select_vector, batch_size,
- &key_rows, &key_eof));
+ &key_rows, &key_eof,
is_dict_filter));
select_vector.reset();
RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type,
select_vector,
- batch_size, &value_rows,
&value_eof));
+ batch_size, &value_rows,
&value_eof,
+ is_dict_filter));
DCHECK_EQ(key_rows, value_rows);
DCHECK_EQ(key_eof, value_eof);
*read_rows = key_rows;
@@ -581,7 +619,7 @@ Status
StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>
}
Status StructColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
ColumnSelectVector& select_vector,
size_t batch_size,
- size_t* read_rows, bool* eof) {
+ size_t* read_rows, bool* eof, bool
is_dict_filter) {
MutableColumnPtr data_column;
NullMap* null_map_ptr = nullptr;
if (doris_column->is_nullable()) {
@@ -609,7 +647,7 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
size_t loop_rows = 0;
bool loop_eof = false;
_child_readers[i]->read_column_data(doris_field, doris_type,
select_vector, batch_size,
- &loop_rows, &loop_eof);
+ &loop_rows, &loop_eof,
is_dict_filter);
if (i != 0) {
DCHECK_EQ(*read_rows, loop_rows);
DCHECK_EQ(*eof, loop_eof);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 292eb06fba..7f9764ec06 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -82,7 +82,21 @@ public:
virtual ~ParquetColumnReader() = default;
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size,
- size_t* read_rows, bool* eof) = 0;
+ size_t* read_rows, bool* eof, bool
is_dict_filter) = 0;
+
+ virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column,
bool* has_dict) {
+ return Status::NotSupported("read_dict_values_to_column is not
supported");
+ }
+
+ virtual Status get_dict_codes(const ColumnString* column_string,
+ std::vector<int32_t>* dict_codes) {
+ return Status::NotSupported("get_dict_codes is not supported");
+ }
+
+ virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
+ LOG(FATAL) << "Method convert_dict_column_to_string_column is not
supported";
+ }
+
static Status create(io::FileReaderSPtr file, FieldSchema* field,
const tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges,
cctz::time_zone* ctz,
@@ -118,7 +132,11 @@ public:
Status init(io::FileReaderSPtr file, FieldSchema* field, size_t
max_buf_size);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
- bool* eof) override;
+ bool* eof, bool is_dict_filter) override;
+ Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool*
has_dict) override;
+ Status get_dict_codes(const ColumnString* column_string,
+ std::vector<int32_t>* dict_codes) override;
+ MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
const std::vector<level_t>& get_rep_level() const override { return
_rep_levels; }
const std::vector<level_t>& get_def_level() const override { return
_def_levels; }
Statistics statistics() override {
@@ -136,10 +154,11 @@ private:
Status _skip_values(size_t num_values);
Status _read_values(size_t num_values, ColumnPtr& doris_column,
DataTypePtr& type,
- ColumnSelectVector& select_vector);
+ ColumnSelectVector& select_vector, bool
is_dict_filter);
Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size,
- size_t* read_rows, bool* eof);
+ size_t* read_rows, bool* eof, bool
is_dict_filter);
+ Status _try_load_dict_page(bool* loaded, bool* has_dict);
};
class ArrayColumnReader : public ParquetColumnReader {
@@ -150,7 +169,7 @@ public:
Status init(std::unique_ptr<ParquetColumnReader> element_reader,
FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
- bool* eof) override;
+ bool* eof, bool is_dict_filter) override;
const std::vector<level_t>& get_rep_level() const override {
return _element_reader->get_rep_level();
}
@@ -174,7 +193,7 @@ public:
std::unique_ptr<ParquetColumnReader> value_reader,
FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
- bool* eof) override;
+ bool* eof, bool is_dict_filter) override;
const std::vector<level_t>& get_rep_level() const override {
return _key_reader->get_rep_level();
@@ -207,7 +226,7 @@ public:
FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
- bool* eof) override;
+ bool* eof, bool is_dict_filter) override;
const std::vector<level_t>& get_rep_level() const override {
return _child_readers[0]->get_rep_level();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 71f77f3735..b48cd722b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -17,9 +17,14 @@
#include "vparquet_group_reader.h"
+#include "exprs/create_predicate_function.h"
#include "schema_desc.h"
#include "util/simd/bits.h"
#include "vec/columns/column_const.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
#include "vparquet_column_reader.h"
namespace doris::vectorized {
@@ -31,7 +36,7 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
const int32_t row_group_id, const
tparquet::RowGroup& row_group,
cctz::time_zone* ctz,
const PositionDeleteContext&
position_delete_ctx,
- const LazyReadContext& lazy_read_ctx)
+ const LazyReadContext& lazy_read_ctx,
RuntimeState* state)
: _file_reader(file_reader),
_read_columns(read_columns),
_row_group_id(row_group_id),
@@ -39,14 +44,35 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr
file_reader,
_remaining_rows(row_group.num_rows),
_ctz(ctz),
_position_delete_ctx(position_delete_ctx),
- _lazy_read_ctx(lazy_read_ctx) {}
+ _lazy_read_ctx(lazy_read_ctx),
+ _state(state),
+ _obj_pool(new ObjectPool()) {}
RowGroupReader::~RowGroupReader() {
_column_readers.clear();
+ for (auto* ctx : _dict_filter_conjuncts) {
+ if (ctx) {
+ ctx->close(_state);
+ }
+ }
+ _obj_pool->clear();
}
-Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges,
- std::unordered_map<int, tparquet::OffsetIndex>&
col_offsets) {
+Status RowGroupReader::init(
+ const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
+ std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
+ const TupleDescriptor* tuple_descriptor, const RowDescriptor*
row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts) {
+ _tuple_descriptor = tuple_descriptor;
+ _row_descriptor = row_descriptor;
+ _col_name_to_slot_id = colname_to_slot_id;
+ _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
+ if (not_single_slot_filter_conjuncts) {
+ _filter_conjuncts.insert(_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
+ not_single_slot_filter_conjuncts->end());
+ }
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
// Query task that only select columns in path.
@@ -71,11 +97,137 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>
}
_column_readers[read_col._file_slot_name] = std::move(reader);
}
+ // Check if single slot can be filtered by dict.
+ if (!_slot_id_to_filter_conjuncts) {
+ return Status::OK();
+ }
+ for (auto& predicate_col_name : _lazy_read_ctx.predicate_columns) {
+ auto field =
const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
+ if (_can_filter_by_dict(predicate_col_name,
+
_row_group_meta.columns[field->physical_column_index].meta_data)) {
+ _dict_filter_col_names.emplace_back(predicate_col_name);
+ } else {
+ int slot_id = _col_name_to_slot_id->at(predicate_col_name);
+ if (_slot_id_to_filter_conjuncts->find(slot_id) !=
+ _slot_id_to_filter_conjuncts->end()) {
+ for (VExprContext* ctx :
_slot_id_to_filter_conjuncts->at(slot_id)) {
+ _filter_conjuncts.push_back(ctx);
+ }
+ }
+ }
+ }
+ RETURN_IF_ERROR(_rewrite_dict_predicates());
return Status::OK();
}
+bool RowGroupReader::_can_filter_by_dict(const string& predicate_col_name,
+ const tparquet::ColumnMetaData&
column_metadata) {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
+ int slot_id = _col_name_to_slot_id->at(predicate_col_name);
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ if (!slot->type().is_string_type()) {
+ return false;
+ }
+
+ if (_slot_id_to_filter_conjuncts->find(slot_id) ==
_slot_id_to_filter_conjuncts->end()) {
+ return false;
+ }
+
+ if (!is_dictionary_encoded(column_metadata)) {
+ return false;
+ }
+
+ // TODOļ¼check expr like 'a > 10 is null', 'a > 10' should can be filter by
dict.
+ for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
+ const VExpr* root_expr = ctx->root();
+ if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+ std::string is_null_str;
+ std::string function_name = root_expr->fn().name.function_name;
+ if (function_name.compare("is_null_pred") == 0 ||
+ function_name.compare("is_not_null_pred") == 0) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+// This function is copied from
+//
https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
+bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata) {
+ // The Parquet spec allows for column chunks to have mixed encodings
+ // where some data pages are dictionary-encoded and others are plain
+ // encoded. For example, a Parquet file writer might start writing
+ // a column chunk as dictionary encoded, but it will switch to plain
+ // encoding if the dictionary grows too large.
+ //
+ // In order for dictionary filters to skip the entire row group,
+ // the conjuncts must be evaluated on column chunks that are entirely
+ // encoded with the dictionary encoding. There are two checks
+ // available to verify this:
+ // 1. The encoding_stats field on the column chunk metadata provides
+ // information about the number of data pages written in each
+ // format. This allows for a specific check of whether all the
+ // data pages are dictionary encoded.
+ // 2. The encodings field on the column chunk metadata lists the
+ // encodings used. If this list contains the dictionary encoding
+ // and does not include unexpected encodings (i.e. encodings not
+ // associated with definition/repetition levels), then it is entirely
+ // dictionary encoded.
+ if (column_metadata.__isset.encoding_stats) {
+ // Condition #1 above
+ for (const tparquet::PageEncodingStats& enc_stat :
column_metadata.encoding_stats) {
+ if (enc_stat.page_type == tparquet::PageType::DATA_PAGE &&
+ (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+ enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
+ enc_stat.count > 0) {
+ return false;
+ }
+ }
+ } else {
+ // Condition #2 above
+ bool has_dict_encoding = false;
+ bool has_nondict_encoding = false;
+ for (const tparquet::Encoding::type& encoding :
column_metadata.encodings) {
+ if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
+ encoding == tparquet::Encoding::RLE_DICTIONARY) {
+ has_dict_encoding = true;
+ }
+
+ // RLE and BIT_PACKED are used for repetition/definition levels
+ if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+ encoding != tparquet::Encoding::RLE_DICTIONARY &&
+ encoding != tparquet::Encoding::RLE && encoding !=
tparquet::Encoding::BIT_PACKED) {
+ has_nondict_encoding = true;
+ break;
+ }
+ }
+ // Not entirely dictionary encoded if:
+ // 1. No dictionary encoding listed
+ // OR
+ // 2. Some non-dictionary encoding is listed
+ if (!has_dict_encoding || has_nondict_encoding) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t*
read_rows,
bool* batch_eof) {
+ if (_is_row_group_filtered) {
+ *read_rows = 0;
+ *batch_eof = true;
+ return Status::OK();
+ }
+
// Process external table query task that select columns are all from path.
if (_read_columns.empty()) {
RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
@@ -113,10 +265,13 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
columns_to_filter[i] = i;
}
if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
- int result_column_id = -1;
- RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block,
&result_column_id));
- ColumnPtr& filter_column =
block->get_by_position(result_column_id).column;
- RETURN_IF_ERROR(_filter_block(block, filter_column,
column_to_keep, columns_to_filter));
+ std::vector<IColumn::Filter*> filters;
+ if (_position_delete_ctx.has_filter) {
+ filters.push_back(_pos_delete_filter_ptr.get());
+ }
+
RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(_filter_conjuncts, filters,
block,
+
columns_to_filter, column_to_keep));
+ _convert_dict_cols_to_string_cols(block);
} else {
RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter));
}
@@ -139,6 +294,25 @@ Status RowGroupReader::_read_column_data(Block* block,
const std::vector<std::st
auto& column_with_type_and_name = block->get_by_name(read_col);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
+ auto col_iter =
+ std::find(_dict_filter_col_names.begin(),
_dict_filter_col_names.end(), read_col);
+ bool is_dict_filter = false;
+ if (col_iter != _dict_filter_col_names.end()) {
+ MutableColumnPtr dict_column = ColumnVector<Int32>::create();
+ size_t pos = block->get_position_by_name(read_col);
+ if (column_type->is_nullable()) {
+ block->get_by_position(pos).type =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ block->replace_by_position(
+ pos, ColumnNullable::create(std::move(dict_column),
+
ColumnUInt8::create(dict_column->size(), 0)));
+ } else {
+ block->get_by_position(pos).type =
std::make_shared<DataTypeInt32>();
+ block->replace_by_position(pos, std::move(dict_column));
+ }
+ is_dict_filter = true;
+ }
+
size_t col_read_rows = 0;
bool col_eof = false;
// Should reset _filter_map_index to 0 when reading next column.
@@ -147,7 +321,7 @@ Status RowGroupReader::_read_column_data(Block* block,
const std::vector<std::st
size_t loop_rows = 0;
RETURN_IF_ERROR(_column_readers[read_col]->read_column_data(
column_ptr, column_type, select_vector, batch_size -
col_read_rows, &loop_rows,
- &col_eof));
+ &col_eof, is_dict_filter));
col_read_rows += loop_rows;
}
if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
@@ -169,7 +343,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
size_t pre_read_rows;
bool pre_eof;
size_t origin_column_num = block->columns();
- int filter_column_id = -1;
+ IColumn::Filter result_filter;
while (true) {
// read predicate columns
pre_read_rows = 0;
@@ -194,16 +368,21 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// The following process may be tricky and time-consuming, but we
have no other way.
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
- RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block,
&filter_column_id));
- ColumnPtr& sv = block->get_by_position(filter_column_id).column;
+ result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
+ bool can_filter_all = false;
+ std::vector<IColumn::Filter*> filters;
+ if (_position_delete_ctx.has_filter) {
+ filters.push_back(_pos_delete_filter_ptr.get());
+ }
+ RETURN_IF_ERROR(_execute_conjuncts(_filter_conjuncts, filters, block,
&result_filter,
+ &can_filter_all));
+
if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}
- // build filter map
- bool can_filter_all = false;
- const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows,
&can_filter_all);
+ const uint8_t* __restrict filter_map = result_filter.data();
select_vector_ptr.reset(new ColumnSelectVector(filter_map,
pre_read_rows, can_filter_all));
if (select_vector_ptr->filter_all() && !pre_eof) {
// If continuous batches are skipped, we can cache them to skip a
whole page
@@ -256,14 +435,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// generated from next batch, so the filter column is removed
ahead.
DCHECK_EQ(block->rows(), 0);
} else {
- const auto& filter_column =
block->get_by_position(filter_column_id).column;
- RETURN_IF_ERROR(_filter_block(block, filter_column,
origin_column_num,
-
_lazy_read_ctx.all_predicate_col_ids));
+ RETURN_IF_ERROR(_filter_block_internal(block,
_lazy_read_ctx.all_predicate_col_ids,
+ result_filter));
+ Block::erase_useless_column(block, origin_column_num);
}
} else {
Block::erase_useless_column(block, origin_column_num);
}
+ _convert_dict_cols_to_string_cols(block);
+
size_t column_num = block->columns();
size_t column_size = 0;
for (int i = 0; i < column_num; ++i) {
@@ -283,55 +464,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
return _fill_missing_columns(block, column_size,
_lazy_read_ctx.missing_columns);
}
-const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t
num_rows,
- bool* can_filter_all) {
- const uint8_t* filter_map = nullptr;
- if (auto* nullable_column = check_and_get_column<ColumnNullable>(*sv)) {
- size_t column_size = nullable_column->size();
- if (column_size == 0) {
- *can_filter_all = true;
- } else {
- DCHECK_EQ(column_size, num_rows);
- const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
- ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(
-
nullable_column->get_nested_column_ptr()->assume_mutable().get());
- auto* __restrict filter_data = concrete_column->get_data().data();
- if (_position_delete_ctx.has_filter) {
- auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
- for (size_t i = 0; i < num_rows; ++i) {
- filter_data[i] &= (!null_map_data[i]) &
pos_delete_filter_data[i];
- }
- } else {
- for (size_t i = 0; i < num_rows; ++i) {
- filter_data[i] &= (!null_map_data[i]);
- }
- }
- filter_map = filter_data;
- }
- } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
- // filter all
- *can_filter_all = !const_column->get_bool(0);
- } else {
- MutableColumnPtr mutable_holder = sv->assume_mutable();
- ColumnUInt8* mutable_filter_column =
typeid_cast<ColumnUInt8*>(mutable_holder.get());
- IColumn::Filter& filter = mutable_filter_column->get_data();
- auto* __restrict filter_data = filter.data();
- const size_t size = filter.size();
-
- if (_position_delete_ctx.has_filter) {
- auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
- for (size_t i = 0; i < size; ++i) {
- filter_data[i] &= pos_delete_filter_data[i];
- }
- }
- filter_map = filter_data;
- }
- return filter_map;
-}
-
void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
std::unique_ptr<uint8_t[]>&
filter_map,
- size_t pre_read_rows) {
+ size_t pre_read_rows) const {
if (_cached_filtered_rows == 0) {
return;
}
@@ -493,73 +628,6 @@ Status RowGroupReader::_build_pos_delete_filter(size_t
read_rows) {
return Status::OK();
}
-Status RowGroupReader::_filter_block(Block* block, const ColumnPtr&
filter_column,
- int column_to_keep, std::vector<uint32_t>
columns_to_filter) {
- if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
- const auto& nested_column = nullable_column->get_nested_column_ptr();
-
- MutableColumnPtr mutable_holder =
- nested_column->use_count() == 1
- ? nested_column->assume_mutable()
- : nested_column->clone_resized(nested_column->size());
-
- ColumnUInt8* concrete_column =
typeid_cast<ColumnUInt8*>(mutable_holder.get());
- if (!concrete_column) {
- return Status::InvalidArgument(
- "Illegal type {} of column for filter. Must be UInt8 or
Nullable(UInt8).",
- filter_column->get_name());
- }
- auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
- IColumn::Filter& filter = concrete_column->get_data();
- auto* __restrict filter_data = filter.data();
- const size_t size = filter.size();
-
- if (_position_delete_ctx.has_filter) {
- auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
- for (size_t i = 0; i < size; ++i) {
- filter_data[i] &= (!null_map_data[i]) &
pos_delete_filter_data[i];
- }
- } else {
- for (size_t i = 0; i < size; ++i) {
- filter_data[i] &= (!null_map_data[i]);
- }
- }
- RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter,
filter));
- } else if (auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
- bool ret = const_column->get_bool(0);
- if (!ret) {
- for (auto& col : columns_to_filter) {
-
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
- }
- }
- } else {
- MutableColumnPtr mutable_holder =
- filter_column->use_count() == 1
- ? filter_column->assume_mutable()
- : filter_column->clone_resized(filter_column->size());
- ColumnUInt8* mutable_filter_column =
typeid_cast<ColumnUInt8*>(mutable_holder.get());
- if (!mutable_filter_column) {
- return Status::InvalidArgument(
- "Illegal type {} of column for filter. Must be UInt8 or
Nullable(UInt8).",
- filter_column->get_name());
- }
-
- IColumn::Filter& filter = mutable_filter_column->get_data();
- auto* __restrict filter_data = filter.data();
-
- if (_position_delete_ctx.has_filter) {
- auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
- const size_t size = filter.size();
- for (size_t i = 0; i < size; ++i) {
- filter_data[i] &= pos_delete_filter_data[i];
- }
- }
- RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter,
filter));
- }
- Block::erase_useless_column(block, column_to_keep);
- return Status::OK();
-}
-
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>&
columns_to_filter) {
if (_pos_delete_filter_ptr) {
@@ -599,6 +667,233 @@ Status RowGroupReader::_filter_block_internal(Block*
block,
return Status::OK();
}
+Status RowGroupReader::_rewrite_dict_predicates() {
+ for (vector<std::string>::iterator it = _dict_filter_col_names.begin();
+ it != _dict_filter_col_names.end();) {
+ std::string& dict_filter_col_name = *it;
+ int slot_id = _col_name_to_slot_id->at(dict_filter_col_name);
+ // 1. Get dictionary values to a string column.
+ MutableColumnPtr dict_value_column = ColumnString::create();
+ bool has_dict = false;
+
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
+ dict_value_column, &has_dict));
+ size_t dict_value_column_size = dict_value_column->size();
+ DCHECK(has_dict);
+ // 2. Build a temp block from the dict string column, then execute
conjuncts and filter block.
+ // 2.1 Build a temp block from the dict string column to match the
conjuncts executing.
+ Block temp_block;
+ int dict_pos = -1;
+ int index = 0;
+ for (const auto slot_desc : _tuple_descriptor->slots()) {
+ if (!slot_desc->need_materialize()) {
+ // should be ignored from reading
+ continue;
+ }
+ if (slot_desc->id() == slot_id) {
+ auto data_type = slot_desc->get_data_type_ptr();
+ if (data_type->is_nullable()) {
+ temp_block.insert(
+
{ColumnNullable::create(std::move(dict_value_column),
+
ColumnUInt8::create(dict_value_column_size, 0)),
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
+ ""});
+ } else {
+ temp_block.insert(
+ {std::move(dict_value_column),
std::make_shared<DataTypeString>(), ""});
+ }
+ dict_pos = index;
+
+ } else {
+
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+ ++index;
+ }
+
+ // 2.2 Execute conjuncts and filter block.
+ const std::vector<VExprContext*>* ctxs = nullptr;
+ auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
+ if (iter != _slot_id_to_filter_conjuncts->end()) {
+ ctxs = &(iter->second);
+ } else {
+ std::stringstream msg;
+ msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "]
not found";
+ return Status::NotFound(msg.str());
+ }
+
+ std::vector<uint32_t> columns_to_filter(1, dict_pos);
+ int column_to_keep = temp_block.columns();
+ if (dict_pos != 0) {
+ // VExprContext.execute has an optimization, the filtering is
executed when block->rows() > 0
+ // The following process may be tricky and time-consuming, but we
have no other way.
+
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
+ }
+ std::vector<IColumn::Filter*> filters;
+ RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters,
&temp_block,
+ columns_to_filter,
column_to_keep));
+ if (dict_pos != 0) {
+ // We have to clean the first column to insert right data.
+ temp_block.get_by_position(0).column->assume_mutable()->clear();
+ }
+
+ // Check some conditions.
+ ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
+ // If dict_column->size() == 0, can filter this row group.
+ if (dict_column->size() == 0) {
+ _is_row_group_filtered = true;
+ return Status::OK();
+ }
+
+ // About Performance: if dict_column size is too large, it will
generate a large IN filter.
+ if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+ for (auto& ctx : (*ctxs)) {
+ _filter_conjuncts.push_back(ctx);
+ }
+ it = _dict_filter_col_names.erase(it);
+ continue;
+ }
+
+ // 3. Get dict codes.
+ std::vector<int32_t> dict_codes;
+ if (dict_column->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ static_cast<const ColumnNullable*>(dict_column.get());
+ const ColumnString* nested_column = static_cast<const
ColumnString*>(
+ nullable_column->get_nested_column_ptr().get());
+
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+ assert_cast<const ColumnString*>(nested_column),
&dict_codes));
+ } else {
+
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+ assert_cast<const ColumnString*>(dict_column.get()),
&dict_codes));
+ }
+
+ // 4. Rewrite conjuncts.
+ _rewrite_dict_conjuncts(dict_codes, slot_id);
+ ++it;
+ }
+ return Status::OK();
+}
+
+Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>&
dict_codes, int slot_id) {
+ VExpr* root;
+ if (dict_codes.size() == 1) {
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_vector_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_child_type(TPrimitiveType::INT);
+ texpr_node.__set_num_children(2);
+ root = _obj_pool->add(new VectorizedFnCall(texpr_node));
+ }
+ {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots =
_tuple_descriptor->slots();
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+ root->add_child(slot_ref_expr);
+ }
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ int_literal.__set_value(dict_codes[0]);
+ texpr_node.__set_int_literal(int_literal);
+ VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node));
+ root->add_child(literal_expr);
+ }
+ } else {
+ {
+ TTypeDesc type_desc =
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::IN_PRED);
+ node.in_predicate.__set_is_not_in(false);
+ node.__set_opcode(TExprOpcode::FILTER_IN);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(TExprOpcode::FILTER_IN);
+
+ root = _obj_pool->add(new vectorized::VDirectInPredicate(node));
+ std::shared_ptr<HybridSetBase>
hybrid_set(create_set(PrimitiveType::TYPE_INT));
+ for (int j = 0; j < dict_codes.size(); ++j) {
+ hybrid_set->insert(&dict_codes[j]);
+ }
+
static_cast<vectorized::VDirectInPredicate*>(root)->set_filter(hybrid_set);
+ }
+ {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots =
_tuple_descriptor->slots();
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+ root->add_child(slot_ref_expr);
+ }
+ }
+ VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new
VExprContext(root));
+ RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
+ RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
+ _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
+ _filter_conjuncts.push_back(rewritten_conjunct_ctx);
+ return Status::OK();
+}
+
+void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+ for (auto& dict_filter_col_name : _dict_filter_col_names) {
+ size_t pos = block->get_position_by_name(dict_filter_col_name);
+ ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(pos);
+ const ColumnPtr& column = column_with_type_and_name.column;
+ if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*column)) {
+ const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
+ const ColumnInt32* dict_column = assert_cast<const
ColumnInt32*>(nested_column.get());
+ DCHECK(dict_column);
+
+ MutableColumnPtr string_column =
+
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
+ dict_column);
+
+ column_with_type_and_name.type =
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ block->replace_by_position(
+ pos, ColumnNullable::create(std::move(string_column),
+
nullable_column->get_null_map_column_ptr()));
+ } else {
+ const ColumnInt32* dict_column = assert_cast<const
ColumnInt32*>(column.get());
+ MutableColumnPtr string_column =
+
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
+ dict_column);
+
+ column_with_type_and_name.type =
std::make_shared<DataTypeString>();
+ block->replace_by_position(pos, std::move(string_column));
+ }
+ }
+}
+
ParquetColumnReader::Statistics RowGroupReader::statistics() {
ParquetColumnReader::Statistics st;
for (auto& reader : _column_readers) {
@@ -608,4 +903,86 @@ ParquetColumnReader::Statistics
RowGroupReader::statistics() {
return st;
}
+// TODO Performance Optimization
+Status RowGroupReader::_execute_conjuncts(const std::vector<VExprContext*>&
ctxs,
+ const std::vector<IColumn::Filter*>&
filters,
+ Block* block, IColumn::Filter*
result_filter,
+ bool* can_filter_all) {
+ *can_filter_all = false;
+ auto* __restrict result_filter_data = result_filter->data();
+ for (auto* ctx : ctxs) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
+ ColumnPtr& filter_column =
block->get_by_position(result_column_id).column;
+ if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
+ size_t column_size = nullable_column->size();
+ if (column_size == 0) {
+ *can_filter_all = true;
+ return Status::OK();
+ } else {
+ const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
+ const IColumn::Filter& filter =
+ assert_cast<const
ColumnUInt8&>(*nested_column).get_data();
+ auto* __restrict filter_data = filter.data();
+ const size_t size = filter.size();
+ auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+
+ for (size_t i = 0; i < size; ++i) {
+ result_filter_data[i] &= (!null_map_data[i]) &
filter_data[i];
+ }
+ if (memchr(filter_data, 0x1, size) == nullptr) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ }
+ } else if (auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
+ // filter all
+ if (!const_column->get_bool(0)) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ } else {
+ const IColumn::Filter& filter =
+ assert_cast<const ColumnUInt8&>(*filter_column).get_data();
+ auto* __restrict filter_data = filter.data();
+
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ result_filter_data[i] &= filter_data[i];
+ }
+
+ if (memchr(filter_data, 0x1, size) == nullptr) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ }
+ }
+ for (auto* filter : filters) {
+ auto* __restrict filter_data = filter->data();
+ const size_t size = filter->size();
+ for (size_t i = 0; i < size; ++i) {
+ result_filter_data[i] &= filter_data[i];
+ }
+ }
+ return Status::OK();
+}
+
+// TODO Performance Optimization
+Status RowGroupReader::_execute_conjuncts_and_filter_block(
+ const std::vector<VExprContext*>& ctxs, const
std::vector<IColumn::Filter*>& filters,
+ Block* block, std::vector<uint32_t>& columns_to_filter, int
column_to_keep) {
+ IColumn::Filter result_filter(block->rows(), 1);
+ bool can_filter_all;
+ RETURN_IF_ERROR(_execute_conjuncts(ctxs, filters, block, &result_filter,
&can_filter_all));
+ if (can_filter_all) {
+ for (auto& col : columns_to_filter) {
+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ } else {
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter,
result_filter));
+ }
+ Block::erase_useless_column(block, column_to_keep);
+ return Status::OK();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index cd1f92a909..99cb6cb85a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -25,6 +25,9 @@
namespace doris::vectorized {
+// TODO: we need to determine it by test.
+static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE =
std::numeric_limits<uint32_t>::max();
+
class RowGroupReader {
public:
static const std::vector<int64_t> NO_DELETE;
@@ -103,11 +106,16 @@ public:
const std::vector<ParquetReadColumn>& read_columns, const
int32_t row_group_id,
const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
const PositionDeleteContext& position_delete_ctx,
- const LazyReadContext& lazy_read_ctx);
+ const LazyReadContext& lazy_read_ctx, RuntimeState* state);
~RowGroupReader();
- Status init(const FieldDescriptor& schema, std::vector<RowRange>&
row_ranges,
- std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
+ Status init(
+ const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
+ std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
+ const TupleDescriptor* tuple_descriptor, const RowDescriptor*
row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts);
Status next_batch(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof);
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows;
}
@@ -120,9 +128,8 @@ private:
size_t batch_size, size_t* read_rows, bool*
batch_eof,
ColumnSelectVector& select_vector);
Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof);
- const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool*
can_filter_all);
void _rebuild_select_vector(ColumnSelectVector& select_vector,
- std::unique_ptr<uint8_t[]>& filter_map, size_t
pre_read_rows);
+ std::unique_ptr<uint8_t[]>& filter_map, size_t
pre_read_rows) const;
Status _fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
@@ -131,13 +138,26 @@ private:
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContext*>&
missing_columns);
Status _build_pos_delete_filter(size_t read_rows);
- Status _filter_block(Block* block, const ColumnPtr& filter_column, int
column_to_keep,
- std::vector<uint32_t> columns_to_filter);
Status _filter_block(Block* block, int column_to_keep,
const vector<uint32_t>& columns_to_filter);
Status _filter_block_internal(Block* block, const vector<uint32_t>&
columns_to_filter,
const IColumn::Filter& filter);
+ bool _can_filter_by_dict(const string& predicate_col_name,
+ const tparquet::ColumnMetaData& column_metadata);
+ bool is_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata);
+ Status _rewrite_dict_predicates();
+ Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int
slot_id);
+ void _convert_dict_cols_to_string_cols(Block* block);
+ Status _execute_conjuncts(const std::vector<VExprContext*>& ctxs,
+ const std::vector<IColumn::Filter*>& filters,
Block* block,
+ IColumn::Filter* result_filter, bool*
can_filter_all);
+ Status _execute_conjuncts_and_filter_block(const
std::vector<VExprContext*>& ctxs,
+ const
std::vector<IColumn::Filter*>& filters,
+ Block* block,
+ std::vector<uint32_t>&
columns_to_filter,
+ int column_to_keep);
+
io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
@@ -156,5 +176,15 @@ private:
std::unique_ptr<TextConverter> _text_converter = nullptr;
std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr;
int64_t _total_read_rows = 0;
+ const TupleDescriptor* _tuple_descriptor;
+ const RowDescriptor* _row_descriptor;
+ const std::unordered_map<std::string, int>* _col_name_to_slot_id;
+ const std::unordered_map<int, std::vector<VExprContext*>>*
_slot_id_to_filter_conjuncts;
+ std::vector<VExprContext*> _dict_filter_conjuncts;
+ std::vector<VExprContext*> _filter_conjuncts;
+ std::vector<std::string> _dict_filter_col_names;
+ RuntimeState* _state;
+ std::shared_ptr<ObjectPool> _obj_pool;
+ bool _is_row_group_filtered = false;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c61f60ad89..7c68f1925a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -34,7 +34,7 @@ namespace doris::vectorized {
ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size,
cctz::time_zone* ctz,
- IOContext* io_ctx)
+ IOContext* io_ctx, RuntimeState* state)
: _profile(profile),
_scan_params(params),
_scan_range(range),
@@ -42,15 +42,20 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
- _io_ctx(io_ctx) {
+ _io_ctx(io_ctx),
+ _state(state) {
_init_profile();
_init_system_properties();
_init_file_description();
}
ParquetReader::ParquetReader(const TFileScanRangeParams& params, const
TFileRangeDesc& range,
- IOContext* io_ctx)
- : _profile(nullptr), _scan_params(params), _scan_range(range),
_io_ctx(io_ctx) {
+ IOContext* io_ctx, RuntimeState* state)
+ : _profile(nullptr),
+ _scan_params(params),
+ _scan_range(range),
+ _io_ctx(io_ctx),
+ _state(state) {
_init_system_properties();
_init_file_description();
}
@@ -195,7 +200,17 @@ Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
- VExprContext* vconjunct_ctx, bool filter_groups) {
+ VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts,
+ bool filter_groups) {
+ _tuple_descriptor = tuple_descriptor;
+ _row_descriptor = row_descriptor;
+ _colname_to_slot_id = colname_to_slot_id;
+ _not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts;
+ _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
if (_file_metadata == nullptr) {
return Status::InternalError("failed to init parquet reader, please
open reader first");
}
@@ -467,10 +482,12 @@ Status ParquetReader::_next_row_group_reader() {
_get_position_delete_ctx(row_group, row_group_index);
_current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns,
row_group_index.row_group_id, row_group, _ctz,
- position_delete_ctx,
_lazy_read_ctx));
+ position_delete_ctx,
_lazy_read_ctx, _state));
_row_group_eof = false;
- return _current_group_reader->init(_file_metadata->schema(),
candidate_row_ranges,
- _col_offsets);
+ return _current_group_reader->init(_file_metadata->schema(),
candidate_row_ranges, _col_offsets,
+ _tuple_descriptor, _row_descriptor,
_colname_to_slot_id,
+ _not_single_slot_filter_conjuncts,
+ _slot_id_to_filter_conjuncts);
}
Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 10e04bdabb..f21996a1df 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -58,10 +58,10 @@ public:
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size,
cctz::time_zone* ctz,
- IOContext* io_ctx);
+ IOContext* io_ctx, RuntimeState* state);
ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
- IOContext* io_ctx);
+ IOContext* io_ctx, RuntimeState* state);
~ParquetReader() override;
// for test
@@ -73,7 +73,12 @@ public:
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
- VExprContext* vconjunct_ctx, bool filter_groups = true);
+ VExprContext* vconjunct_ctx, const TupleDescriptor*
tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts,
+ bool filter_groups = true);
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
@@ -204,5 +209,11 @@ private:
ParquetProfile _parquet_profile;
bool _closed = false;
IOContext* _io_ctx;
+ RuntimeState* _state;
+ const TupleDescriptor* _tuple_descriptor;
+ const RowDescriptor* _row_descriptor;
+ const std::unordered_map<std::string, int>* _colname_to_slot_id;
+ const std::vector<VExprContext*>* _not_single_slot_filter_conjuncts;
+ const std::unordered_map<int, std::vector<VExprContext*>>*
_slot_id_to_filter_conjuncts;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index ce36da6a92..353b3bb198 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -63,7 +63,11 @@ Status IcebergTableReader::init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
- VExprContext* vconjunct_ctx) {
+ VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts) {
ParquetReader* parquet_reader =
static_cast<ParquetReader*>(_file_format_reader.get());
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
@@ -73,8 +77,10 @@ Status IcebergTableReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
- Status status = parquet_reader->init_reader(_all_required_col_names,
_not_in_file_col_names,
- &_new_colname_to_value_range,
vconjunct_ctx);
+ Status status = parquet_reader->init_reader(
+ _all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range,
+ vconjunct_ctx, tuple_descriptor, row_descriptor,
colname_to_slot_id,
+ not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
return status;
}
@@ -181,7 +187,7 @@ Status IcebergTableReader::_position_delete(
delete_range.file_size = -1;
ParquetReader delete_reader(_profile, _params, delete_range,
102400,
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
- _io_ctx);
+ _io_ctx, _state);
if (!init_schema) {
delete_reader.get_parsed_schema(&delete_file_col_names,
&delete_file_col_types);
init_schema = true;
@@ -191,6 +197,7 @@ Status IcebergTableReader::_position_delete(
return nullptr;
}
create_status = delete_reader.init_reader(delete_file_col_names,
_not_in_file_col_names,
+ nullptr, nullptr,
nullptr, nullptr, nullptr,
nullptr, nullptr, false);
if (!create_status.ok()) {
return nullptr;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 0f3343e3ca..3e57eda86b 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -63,7 +63,11 @@ public:
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
- VExprContext* vconjunct_ctx);
+ VExprContext* vconjunct_ctx, const TupleDescriptor*
tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, std::vector<VExprContext*>>*
slot_id_to_filter_conjuncts);
enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 50b54f1691..f6b3c98573 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -93,7 +93,8 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>*
scanners) {
runtime_profile(), _kv_cache);
_scanner_pool.add(scanner);
RETURN_IF_ERROR(((VFileScanner*)scanner)
- ->prepare(_vconjunct_ctx_ptr.get(),
&_colname_to_value_range));
+ ->prepare(_vconjunct_ctx_ptr.get(),
&_colname_to_value_range,
+ &_colname_to_slot_id));
scanners->push_back(scanner);
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index fbc49940bc..aaa0ec9f36 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -36,6 +36,7 @@
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exprs/vslot_ref.h"
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
@@ -59,9 +60,11 @@ VFileScanner::VFileScanner(RuntimeState* state,
NewFileScanNode* parent, int64_t
Status VFileScanner::prepare(
VExprContext** vconjunct_ctx_ptr,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, int>* colname_to_slot_id) {
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
_colname_to_value_range = colname_to_value_range;
+ _col_name_to_slot_id = colname_to_slot_id;
_get_block_timer = ADD_TIMER(_parent->_scanner_profile,
"FileScannerGetBlockTime");
_cast_to_input_block_timer =
@@ -101,6 +104,59 @@ Status VFileScanner::prepare(
return Status::OK();
}
+Status VFileScanner::_split_conjuncts(VExpr* conjunct_expr_root) {
+ static constexpr auto is_leaf = [](VExpr* expr) { return
!expr->is_and_expr(); };
+ if (conjunct_expr_root != nullptr) {
+ if (is_leaf(conjunct_expr_root)) {
+ auto impl = conjunct_expr_root->get_impl();
+ // If impl is not null, which means this a conjuncts from runtime
filter.
+ VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) :
conjunct_expr_root;
+ VExprContext* new_ctx = _state->obj_pool()->add(new
VExprContext(cur_expr));
+ _vconjunct_ctx->clone_fn_contexts(new_ctx);
+ RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc));
+ RETURN_IF_ERROR(new_ctx->open(_state));
+
+ std::vector<int> slot_ids;
+ _get_slot_ids(cur_expr, &slot_ids);
+ if (slot_ids.size() == 0) {
+ _not_single_slot_filter_conjuncts.emplace_back(new_ctx);
+ return Status::OK();
+ }
+ bool single_slot = true;
+ for (int i = 1; i < slot_ids.size(); i++) {
+ if (slot_ids[i] != slot_ids[0]) {
+ single_slot = false;
+ break;
+ }
+ }
+ if (single_slot) {
+ SlotId slot_id = slot_ids[0];
+ if (_slot_id_to_filter_conjuncts.find(slot_id) ==
+ _slot_id_to_filter_conjuncts.end()) {
+ _slot_id_to_filter_conjuncts.insert({slot_id,
std::vector<VExprContext*>()});
+ }
+ _slot_id_to_filter_conjuncts[slot_id].emplace_back(new_ctx);
+ } else {
+ _not_single_slot_filter_conjuncts.emplace_back(new_ctx);
+ }
+ } else {
+
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[0]));
+
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[1]));
+ }
+ }
+ return Status::OK();
+}
+
+void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
+ for (VExpr* child_expr : expr->children()) {
+ if (child_expr->is_slot_ref()) {
+ VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr);
+ slot_ids->emplace_back(slot_ref->slot_id());
+ }
+ _get_slot_ids(child_expr, slot_ids);
+ }
+}
+
Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_init_expr_ctxes());
@@ -481,7 +537,7 @@ Status VFileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_PARQUET: {
ParquetReader* parquet_reader = new ParquetReader(
_profile, _params, range,
_state->query_options().batch_size,
- const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx.get());
+ const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx.get(), _state);
RETURN_IF_ERROR(parquet_reader->open());
if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx !=
nullptr) {
RETURN_IF_ERROR(_vconjunct_ctx->clone(_state,
&_push_down_expr));
@@ -492,14 +548,18 @@ Status VFileScanner::_get_next_reader() {
IcebergTableReader* iceberg_reader =
new IcebergTableReader((GenericReader*)parquet_reader,
_profile, _state,
_params, range, _kv_cache,
_io_ctx.get());
- init_status = iceberg_reader->init_reader(_file_col_names,
_col_id_name_map,
-
_colname_to_value_range, _push_down_expr);
+ init_status = iceberg_reader->init_reader(
+ _file_col_names, _col_id_name_map,
_colname_to_value_range, _push_down_expr,
+ _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id,
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
std::vector<std::string> place_holder;
- init_status = parquet_reader->init_reader(_file_col_names,
place_holder,
-
_colname_to_value_range, _push_down_expr);
+ init_status = parquet_reader->init_reader(
+ _file_col_names, place_holder,
_colname_to_value_range, _push_down_expr,
+ _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id,
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader.reset((GenericReader*)parquet_reader);
}
break;
@@ -749,6 +809,20 @@ Status VFileScanner::close(RuntimeState* state) {
_push_down_expr->close(state);
}
+ for (auto& [k, v] : _slot_id_to_filter_conjuncts) {
+ for (auto& ctx : v) {
+ if (ctx != nullptr) {
+ ctx->close(state);
+ }
+ }
+ }
+
+ for (auto* ctx : _not_single_slot_filter_conjuncts) {
+ if (ctx != nullptr) {
+ ctx->close(state);
+ }
+ }
+
if (config::enable_file_cache) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 4e503da5ef..410d357d18 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -42,7 +42,8 @@ public:
public:
Status prepare(VExprContext** vconjunct_ctx_ptr,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, int>*
colname_to_slot_id);
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof)
override;
@@ -106,6 +107,7 @@ protected:
int _rows = 0;
int _num_of_columns_from_file;
+ bool _src_block_mem_reuse = false;
bool _strict_mode;
bool _src_block_init = false;
@@ -114,6 +116,8 @@ protected:
VExprContext* _push_down_expr = nullptr;
bool _is_dynamic_schema = false;
+ // for tracing dynamic schema
+ std::unique_ptr<vectorized::schema_util::FullBaseSchemaView>
_full_base_schema_view;
std::unique_ptr<FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<IOContext> _io_ctx;
@@ -126,6 +130,12 @@ private:
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
+ const std::unordered_map<std::string, int>* _col_name_to_slot_id;
+ // single slot filter conjuncts
+ std::unordered_map<int, std::vector<VExprContext*>>
_slot_id_to_filter_conjuncts;
+ // not single(zero or multi) slot filter conjuncts
+ std::vector<VExprContext*> _not_single_slot_filter_conjuncts;
+
private:
Status _init_expr_ctxes();
Status _init_src_block(Block* block);
@@ -135,6 +145,9 @@ private:
Status _pre_filter_src_block();
Status _convert_to_output_block(Block* block);
Status _generate_fill_columns();
+ Status _handle_dynamic_block(Block* block);
+ Status _split_conjuncts(VExpr* conjunct_expr_root);
+ void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
void _reset_counter() {
_counter.num_rows_unselected = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index f019df4b60..f8fa213d33 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -447,6 +447,8 @@ Status VScanNode::_normalize_conjuncts() {
std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
+ _colname_to_slot_id[slots[slot_idx]->col_name()] =
slots[slot_idx]->id();
+
auto type = slots[slot_idx]->type().type;
if (slots[slot_idx]->type().type == TYPE_ARRAY) {
type = slots[slot_idx]->type().children[0].type;
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 51dc2edb1b..de641e6055 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -311,6 +311,8 @@ protected:
RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage;
+ std::unordered_map<std::string, int> _colname_to_slot_id;
+
private:
// Register and get all runtime filters at Init phase.
Status _register_runtime_filter();
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 42ff7cf538..4ca7e55ecd 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -108,7 +108,8 @@ TEST_F(ParquetReaderTest, normal) {
scan_range.start_offset = 0;
scan_range.size = 1000;
}
- auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992,
&ctz, nullptr);
+ auto p_reader =
+ new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz,
nullptr, nullptr);
p_reader->set_file_reader(reader);
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.set_desc_tbl(desc_tbl);
@@ -116,7 +117,8 @@ TEST_F(ParquetReaderTest, normal) {
std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
p_reader->open();
- p_reader->init_reader(column_names, missing_column_names, nullptr,
nullptr);
+ p_reader->init_reader(column_names, missing_column_names, nullptr,
nullptr, nullptr, nullptr,
+ nullptr, nullptr, nullptr);
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> missing_columns;
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c7e4894d20..9426505489 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -191,7 +191,7 @@ static Status get_column_values(io::FileReaderSPtr
file_reader, tparquet::Column
// required column
std::vector<u_short> null_map = {(u_short)rows};
run_length_map.set_run_length_null_map(null_map, rows, nullptr);
- return chunk_reader.decode_values(data_column, data_type,
run_length_map);
+ return chunk_reader.decode_values(data_column, data_type,
run_length_map, false);
} else {
// column with null values
level_t level_type = definitions[0];
@@ -204,8 +204,8 @@ static Status get_column_values(io::FileReaderSPtr
file_reader, tparquet::Column
} else {
std::vector<u_short> null_map = {(u_short)num_values};
run_length_map.set_run_length_null_map(null_map, rows,
nullptr);
- RETURN_IF_ERROR(
- chunk_reader.decode_values(data_column, data_type,
run_length_map));
+ RETURN_IF_ERROR(chunk_reader.decode_values(data_column,
data_type,
+ run_length_map,
false));
}
level_type = definitions[i];
num_values = 1;
@@ -219,7 +219,8 @@ static Status get_column_values(io::FileReaderSPtr
file_reader, tparquet::Column
} else {
std::vector<u_short> null_map = {(u_short)num_values};
run_length_map.set_run_length_null_map(null_map, rows, nullptr);
- RETURN_IF_ERROR(chunk_reader.decode_values(data_column, data_type,
run_length_map));
+ RETURN_IF_ERROR(
+ chunk_reader.decode_values(data_column, data_type,
run_length_map, false));
}
return Status::OK();
}
@@ -421,12 +422,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
std::shared_ptr<RowGroupReader> row_group_reader;
RowGroupReader::PositionDeleteContext
position_delete_ctx(row_group.num_rows, 0);
row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0,
row_group, &ctz,
- position_delete_ctx,
lazy_read_ctx));
+ position_delete_ctx,
lazy_read_ctx, nullptr));
std::vector<RowRange> row_ranges;
row_ranges.emplace_back(0, row_group.num_rows);
auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
- auto stg = row_group_reader->init(meta_data->schema(), row_ranges,
col_offsets);
+ auto stg = row_group_reader->init(meta_data->schema(), row_ranges,
col_offsets, nullptr,
+ nullptr, nullptr, nullptr, nullptr);
EXPECT_TRUE(stg.ok());
vectorized::Block block;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]