This is an automated email from the ASF dual-hosted git repository. suxiaogang223 pushed a commit to branch refact_reader_branch in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4ca4e217a29d80f45e83690aacd03c8f61dd8c22 Author: Socrates <[email protected]> AuthorDate: Fri May 29 10:00:21 2026 +0800 [feature](be) Add parquet dictionary row group pruning ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Add first-stage dictionary predicate pushdown for the new Parquet reader. It conservatively prunes fully dictionary encoded string-like row groups for EQ and IN predicates by evaluating owned dictionary values before reading data pages. ### Release note None ### Check List (For Author) - Test: Manual test - Ran build-support/clang-format.sh on modified BE files. - Ran git diff --check. - Local targeted BE UT could not run because the Mac toolchain fails CMake compiler detection with ld: library 'c++' not found. - Behavior changed: No - Does this need documentation: No --- be/src/format/new_parquet/parquet_reader.cpp | 5 +- be/src/format/new_parquet/parquet_statistics.cpp | 207 +++++++++++++++++++-- be/src/format/new_parquet/parquet_statistics.h | 6 +- be/test/format/new_parquet/parquet_reader_test.cpp | 121 ++++++++++++ 4 files changed, 323 insertions(+), 16 deletions(-) diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 5e4107d727d..2626df205fa 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -675,8 +675,9 @@ Status ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) { reader::SchemaField projected_field; RETURN_IF_ERROR(_get_projected_schema_field(file_column_id, &projection, &projected_field)); } - RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, _state->file_schema, - *_request, &_state->selected_row_groups)); + RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, _state->file_reader.get(), + _state->file_schema, *_request, + &_state->selected_row_groups)); std::vector<int> range_selected_row_groups; range_selected_row_groups.reserve(_state->selected_row_groups.size()); for (const auto row_group_idx : _state->selected_row_groups) { diff --git a/be/src/format/new_parquet/parquet_statistics.cpp b/be/src/format/new_parquet/parquet_statistics.cpp index a28ccb8ae25..b7a4ad9b096 100644 --- a/be/src/format/new_parquet/parquet_statistics.cpp +++ b/be/src/format/new_parquet/parquet_statistics.cpp @@ -19,9 +19,12 @@ #include <parquet/api/reader.h> #include <parquet/api/schema.h> +#include <parquet/column_reader.h> #include <parquet/statistics.h> #include <parquet/types.h> +#include <cstddef> +#include <exception> #include <memory> #include <string> #include <utility> @@ -103,6 +106,165 @@ bool is_null_only_predicate(const ColumnPredicate& predicate) { predicate.type() == PredicateType::IS_NOT_NULL; } +bool is_supported_dictionary_predicate(const ColumnPredicate& predicate) { + switch (predicate.type()) { + case PredicateType::EQ: + case PredicateType::IN_LIST: + return true; + default: + return false; + } +} + +bool is_dictionary_data_encoding(::parquet::Encoding::type encoding) { + return encoding == ::parquet::Encoding::PLAIN_DICTIONARY || + encoding == ::parquet::Encoding::RLE_DICTIONARY; +} + +bool is_level_encoding(::parquet::Encoding::type encoding) { + return encoding == ::parquet::Encoding::RLE || encoding == ::parquet::Encoding::BIT_PACKED; +} + +bool is_data_page_type(::parquet::PageType::type page_type) { + return page_type == ::parquet::PageType::DATA_PAGE || + page_type == ::parquet::PageType::DATA_PAGE_V2; +} + +bool is_dictionary_encoded_chunk(const ::parquet::ColumnChunkMetaData& column_metadata) { + if (!column_metadata.has_dictionary_page()) { + return false; + } + + const auto& encoding_stats = column_metadata.encoding_stats(); + if (!encoding_stats.empty()) { + bool has_dictionary_data_page = false; + for (const auto& encoding_stat : encoding_stats) { + if (!is_data_page_type(encoding_stat.page_type) || encoding_stat.count <= 0) { + continue; + } + if (!is_dictionary_data_encoding(encoding_stat.encoding)) { + return false; + } + has_dictionary_data_page = true; + } + return has_dictionary_data_page; + } + + bool has_dictionary_encoding = false; + for (const auto encoding : column_metadata.encodings()) { + if (is_dictionary_data_encoding(encoding)) { + has_dictionary_encoding = true; + continue; + } + if (!is_level_encoding(encoding)) { + return false; + } + } + return has_dictionary_encoding; +} + +bool supports_dictionary_pruning(const ParquetColumnSchema& column_schema, + const ::parquet::ColumnChunkMetaData& column_metadata, + const reader::FileColumnPredicateFilter& column_filter) { + if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || + column_schema.descriptor == nullptr || column_schema.type == nullptr) { + return false; + } + if (!column_schema.type_descriptor.is_string_like) { + return false; + } + if (column_metadata.type() != ::parquet::Type::BYTE_ARRAY && + column_metadata.type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { + return false; + } + for (const auto& column_predicate : column_filter.predicates) { + if (column_predicate == nullptr || !is_supported_dictionary_predicate(*column_predicate)) { + return false; + } + } + return true; +} + +struct OwnedDictionaryWords { + std::vector<std::string> values; + std::vector<StringRef> refs; + + void clear() { + values.clear(); + refs.clear(); + } + + void build_refs() { + refs.reserve(values.size()); + for (const auto& value : values) { + refs.emplace_back(value.data(), value.size()); + } + } +}; + +bool read_dictionary_words(::parquet::ParquetFileReader* file_reader, int row_group_idx, + int leaf_column_id, const ParquetColumnSchema& column_schema, + OwnedDictionaryWords* dict_words) { + DORIS_CHECK(dict_words != nullptr); + dict_words->clear(); + if (file_reader == nullptr || leaf_column_id < 0) { + return false; + } + + auto row_group_reader = file_reader->RowGroup(row_group_idx); + if (row_group_reader == nullptr) { + return false; + } + auto page_reader = row_group_reader->GetColumnPageReader(leaf_column_id); + if (page_reader == nullptr) { + return false; + } + auto column_reader = + ::parquet::ColumnReader::Make(column_schema.descriptor, std::move(page_reader)); + if (column_reader == nullptr) { + return false; + } + + int32_t dictionary_length = 0; + const void* dictionary = nullptr; + try { + dictionary = column_reader->ReadDictionary(&dictionary_length); + } catch (const ::parquet::ParquetException&) { + return false; + } catch (const std::exception&) { + return false; + } + if (dictionary == nullptr || dictionary_length <= 0) { + return false; + } + + dict_words->values.reserve(static_cast<size_t>(dictionary_length)); + if (column_schema.descriptor->physical_type() == ::parquet::Type::BYTE_ARRAY) { + const auto* byte_array_values = reinterpret_cast<const ::parquet::ByteArray*>(dictionary); + for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) { + dict_words->values.emplace_back( + reinterpret_cast<const char*>(byte_array_values[dict_idx].ptr), + byte_array_values[dict_idx].len); + } + dict_words->build_refs(); + return true; + } + if (column_schema.descriptor->physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { + const int type_length = column_schema.descriptor->type_length(); + if (type_length <= 0) { + return false; + } + const auto* flba_values = reinterpret_cast<const ::parquet::FixedLenByteArray*>(dictionary); + for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) { + dict_words->values.emplace_back( + reinterpret_cast<const char*>(flba_values[dict_idx].ptr), type_length); + } + dict_words->build_refs(); + return true; + } + return false; +} + segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics& statistics) { segment_v2::ZoneMap predicate_statistics; predicate_statistics.min_value = statistics.min_value; @@ -181,26 +343,46 @@ bool ParquetStatisticsUtils::CheckStatistics(const reader::FileColumnPredicateFi } bool ParquetStatisticsUtils::RowGroupExcludes( - const ::parquet::RowGroupMetaData& row_group, - const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, + const ::parquet::RowGroupMetaData& row_group, ::parquet::ParquetFileReader* file_reader, + int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, const reader::FileColumnPredicateFilter& column_filter) { if (column_filter.predicates.empty()) { return false; } - DCHECK(column_filter.file_column_id >= 0 && - column_filter.file_column_id < row_group.num_columns()); DCHECK_LT(column_filter.file_column_id, schema.size()); - auto column_chunk = row_group.ColumnChunk(column_filter.file_column_id); + const auto& column_schema = *schema[column_filter.file_column_id]; + if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || + column_schema.leaf_column_id < 0) { + return false; + } + DCHECK_LT(column_schema.leaf_column_id, row_group.num_columns()); + auto column_chunk = row_group.ColumnChunk(column_schema.leaf_column_id); if (column_chunk == nullptr) { return false; } - return CheckStatistics(column_filter, - TransformColumnStatistics(*schema[column_filter.file_column_id], - column_chunk->statistics())); + if (CheckStatistics(column_filter, + TransformColumnStatistics(column_schema, column_chunk->statistics()))) { + return true; + } + if (!supports_dictionary_pruning(column_schema, *column_chunk, column_filter) || + !is_dictionary_encoded_chunk(*column_chunk)) { + return false; + } + OwnedDictionaryWords dict_words; + if (!read_dictionary_words(file_reader, row_group_idx, column_schema.leaf_column_id, + column_schema, &dict_words)) { + return false; + } + for (const auto& column_predicate : column_filter.predicates) { + if (!column_predicate->evaluate_and(dict_words.refs.data(), dict_words.refs.size())) { + return true; + } + } + return false; } Status ParquetStatisticsUtils::SelectRowGroups( - const ::parquet::FileMetaData& metadata, + const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, const reader::FileScanRequest& request, std::vector<int>* selected_row_groups) { if (selected_row_groups == nullptr) { @@ -218,7 +400,8 @@ Status ParquetStatisticsUtils::SelectRowGroups( } bool drop = false; for (const auto& column_filter : request.column_predicate_filters) { - if (RowGroupExcludes(*row_group, file_schema, column_filter)) { + if (RowGroupExcludes(*row_group, file_reader, row_group_idx, file_schema, + column_filter)) { drop = true; break; } @@ -246,10 +429,10 @@ bool ParquetStatisticsUtils::BloomFilterSupported(const ParquetColumnSchema& col } Status select_row_groups_by_statistics( - const ::parquet::FileMetaData& metadata, + const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, const reader::FileScanRequest& request, std::vector<int>* selected_row_groups) { - return ParquetStatisticsUtils::SelectRowGroups(metadata, file_schema, request, + return ParquetStatisticsUtils::SelectRowGroups(metadata, file_reader, file_schema, request, selected_row_groups); } diff --git a/be/src/format/new_parquet/parquet_statistics.h b/be/src/format/new_parquet/parquet_statistics.h index 4f43ae245b5..ff1c300e84c 100644 --- a/be/src/format/new_parquet/parquet_statistics.h +++ b/be/src/format/new_parquet/parquet_statistics.h @@ -26,6 +26,7 @@ namespace parquet { class FileMetaData; +class ParquetFileReader; class RowGroupMetaData; class Statistics; } // namespace parquet @@ -66,11 +67,12 @@ struct ParquetStatisticsUtils { const ParquetColumnStatistics& statistics); static bool RowGroupExcludes(const ::parquet::RowGroupMetaData& row_group, + ::parquet::ParquetFileReader* file_reader, int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, const reader::FileColumnPredicateFilter& column_filter); static Status SelectRowGroups( - const ::parquet::FileMetaData& metadata, + const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, const reader::FileScanRequest& request, std::vector<int>* selected_row_groups); @@ -82,7 +84,7 @@ struct ParquetStatisticsUtils { // 后续 page index、dictionary、bloom filter 等文件格式优化也应继续收敛在这一层,避免污染 // ParquetReader 的 scan 调度代码。 Status select_row_groups_by_statistics( - const ::parquet::FileMetaData& metadata, + const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, const reader::FileScanRequest& request, std::vector<int>* selected_row_groups); diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 0be12c27129..255ad574a26 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -203,6 +203,30 @@ void write_int_pair_parquet_file(const std::string& file_path, int64_t row_group row_group_size, builder.build())); } +void write_dictionary_filter_parquet_file(const std::string& file_path) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32(), false), + arrow::field("value", arrow::utf8(), false), + }); + auto table = + arrow::Table::Make(schema, {build_int32_array({1, 2, 3, 4, 5, 6}), + build_string_array({"aa", "az", "lm", "lz", "za", "zz"})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + builder.enable_dictionary("value"); + builder.disable_dictionary("id"); + builder.disable_statistics(); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, + builder.build())); +} + Block build_file_block(const std::vector<reader::SchemaField>& schema) { Block block; for (const auto& field : schema) { @@ -576,6 +600,103 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) { EXPECT_EQ(values, std::vector<std::string>({"three", "four", "five"})); } +TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByDictionary) { + write_dictionary_filter_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 6); + for (int row_group_idx = 0; row_group_idx < 6; ++row_group_idx) { + auto row_group = parquet_file_reader->metadata()->RowGroup(row_group_idx); + ASSERT_NE(row_group, nullptr); + auto value_chunk = row_group->ColumnChunk(1); + ASSERT_NE(value_chunk, nullptr); + ASSERT_TRUE(value_chunk->has_dictionary_page()); + ASSERT_TRUE(value_chunk->statistics() == nullptr || + !value_chunk->statistics()->HasMinMax()); + } + + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector<reader::SchemaField> schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_unique<reader::FileScanRequest>(); + request->predicate_columns = {1}; + request->non_predicate_columns = {0}; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 1; + column_filter.predicates.push_back(create_comparison_predicate<PredicateType::EQ>( + 1, "value", schema[1].type, Field::create_field<TYPE_STRING>("lm"), false)); + request->column_predicate_filters.push_back(std::move(column_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + std::vector<int32_t> ids; + std::vector<std::string> values; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); + const auto& value_column = + assert_cast<const ColumnString&>(*block.get_by_position(1).column); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + values.push_back(value_column.get_data_at(row).to_string()); + } + } + + EXPECT_EQ(ids, std::vector<int32_t>({3})); + EXPECT_EQ(values, std::vector<std::string>({"lm"})); +} + +TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) { + write_dictionary_filter_parquet_file(_file_path); + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector<reader::SchemaField> schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_unique<reader::FileScanRequest>(); + request->predicate_columns = {1}; + request->non_predicate_columns = {0}; + auto set = build_set<TYPE_STRING>(); + set->insert(const_cast<char*>("az"), 2); + set->insert(const_cast<char*>("za"), 2); + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 1; + column_filter.predicates.push_back(create_in_list_predicate<PredicateType::IN_LIST>( + 1, "value", schema[1].type, set, false)); + request->column_predicate_filters.push_back(std::move(column_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + std::vector<int32_t> ids; + std::vector<std::string> values; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); + const auto& value_column = + assert_cast<const ColumnString&>(*block.get_by_position(1).column); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + values.push_back(value_column.get_data_at(row).to_string()); + } + } + + EXPECT_EQ(ids, std::vector<int32_t>({2, 5})); + EXPECT_EQ(values, std::vector<std::string>({"az", "za"})); +} + TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) { write_parquet_file(_file_path, 2); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
