This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4ca4e217a29d80f45e83690aacd03c8f61dd8c22
Author: Socrates <[email protected]>
AuthorDate: Fri May 29 10:00:21 2026 +0800

    [feature](be) Add parquet dictionary row group pruning
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Add first-stage dictionary predicate pushdown for the new 
Parquet reader. It conservatively prunes fully dictionary encoded string-like 
row groups for EQ and IN predicates by evaluating owned dictionary values 
before reading data pages.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Manual test
    
        - Ran build-support/clang-format.sh on modified BE files.
    
        - Ran git diff --check.
    
        - Local targeted BE UT could not run because the Mac toolchain fails 
CMake compiler detection with ld: library 'c++' not found.
    
    - Behavior changed: No
    
    - Does this need documentation: No
---
 be/src/format/new_parquet/parquet_reader.cpp       |   5 +-
 be/src/format/new_parquet/parquet_statistics.cpp   | 207 +++++++++++++++++++--
 be/src/format/new_parquet/parquet_statistics.h     |   6 +-
 be/test/format/new_parquet/parquet_reader_test.cpp | 121 ++++++++++++
 4 files changed, 323 insertions(+), 16 deletions(-)

diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 5e4107d727d..2626df205fa 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -675,8 +675,9 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
         reader::SchemaField projected_field;
         RETURN_IF_ERROR(_get_projected_schema_field(file_column_id, 
&projection, &projected_field));
     }
-    RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, 
_state->file_schema,
-                                                    *_request, 
&_state->selected_row_groups));
+    RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, 
_state->file_reader.get(),
+                                                    _state->file_schema, 
*_request,
+                                                    
&_state->selected_row_groups));
     std::vector<int> range_selected_row_groups;
     range_selected_row_groups.reserve(_state->selected_row_groups.size());
     for (const auto row_group_idx : _state->selected_row_groups) {
diff --git a/be/src/format/new_parquet/parquet_statistics.cpp 
b/be/src/format/new_parquet/parquet_statistics.cpp
index a28ccb8ae25..b7a4ad9b096 100644
--- a/be/src/format/new_parquet/parquet_statistics.cpp
+++ b/be/src/format/new_parquet/parquet_statistics.cpp
@@ -19,9 +19,12 @@
 
 #include <parquet/api/reader.h>
 #include <parquet/api/schema.h>
+#include <parquet/column_reader.h>
 #include <parquet/statistics.h>
 #include <parquet/types.h>
 
+#include <cstddef>
+#include <exception>
 #include <memory>
 #include <string>
 #include <utility>
@@ -103,6 +106,165 @@ bool is_null_only_predicate(const ColumnPredicate& 
predicate) {
            predicate.type() == PredicateType::IS_NOT_NULL;
 }
 
+bool is_supported_dictionary_predicate(const ColumnPredicate& predicate) {
+    switch (predicate.type()) {
+    case PredicateType::EQ:
+    case PredicateType::IN_LIST:
+        return true;
+    default:
+        return false;
+    }
+}
+
+bool is_dictionary_data_encoding(::parquet::Encoding::type encoding) {
+    return encoding == ::parquet::Encoding::PLAIN_DICTIONARY ||
+           encoding == ::parquet::Encoding::RLE_DICTIONARY;
+}
+
+bool is_level_encoding(::parquet::Encoding::type encoding) {
+    return encoding == ::parquet::Encoding::RLE || encoding == 
::parquet::Encoding::BIT_PACKED;
+}
+
+bool is_data_page_type(::parquet::PageType::type page_type) {
+    return page_type == ::parquet::PageType::DATA_PAGE ||
+           page_type == ::parquet::PageType::DATA_PAGE_V2;
+}
+
+bool is_dictionary_encoded_chunk(const ::parquet::ColumnChunkMetaData& 
column_metadata) {
+    if (!column_metadata.has_dictionary_page()) {
+        return false;
+    }
+
+    const auto& encoding_stats = column_metadata.encoding_stats();
+    if (!encoding_stats.empty()) {
+        bool has_dictionary_data_page = false;
+        for (const auto& encoding_stat : encoding_stats) {
+            if (!is_data_page_type(encoding_stat.page_type) || 
encoding_stat.count <= 0) {
+                continue;
+            }
+            if (!is_dictionary_data_encoding(encoding_stat.encoding)) {
+                return false;
+            }
+            has_dictionary_data_page = true;
+        }
+        return has_dictionary_data_page;
+    }
+
+    bool has_dictionary_encoding = false;
+    for (const auto encoding : column_metadata.encodings()) {
+        if (is_dictionary_data_encoding(encoding)) {
+            has_dictionary_encoding = true;
+            continue;
+        }
+        if (!is_level_encoding(encoding)) {
+            return false;
+        }
+    }
+    return has_dictionary_encoding;
+}
+
+bool supports_dictionary_pruning(const ParquetColumnSchema& column_schema,
+                                 const ::parquet::ColumnChunkMetaData& 
column_metadata,
+                                 const reader::FileColumnPredicateFilter& 
column_filter) {
+    if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
+        column_schema.descriptor == nullptr || column_schema.type == nullptr) {
+        return false;
+    }
+    if (!column_schema.type_descriptor.is_string_like) {
+        return false;
+    }
+    if (column_metadata.type() != ::parquet::Type::BYTE_ARRAY &&
+        column_metadata.type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+        return false;
+    }
+    for (const auto& column_predicate : column_filter.predicates) {
+        if (column_predicate == nullptr || 
!is_supported_dictionary_predicate(*column_predicate)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+struct OwnedDictionaryWords {
+    std::vector<std::string> values;
+    std::vector<StringRef> refs;
+
+    void clear() {
+        values.clear();
+        refs.clear();
+    }
+
+    void build_refs() {
+        refs.reserve(values.size());
+        for (const auto& value : values) {
+            refs.emplace_back(value.data(), value.size());
+        }
+    }
+};
+
+bool read_dictionary_words(::parquet::ParquetFileReader* file_reader, int 
row_group_idx,
+                           int leaf_column_id, const ParquetColumnSchema& 
column_schema,
+                           OwnedDictionaryWords* dict_words) {
+    DORIS_CHECK(dict_words != nullptr);
+    dict_words->clear();
+    if (file_reader == nullptr || leaf_column_id < 0) {
+        return false;
+    }
+
+    auto row_group_reader = file_reader->RowGroup(row_group_idx);
+    if (row_group_reader == nullptr) {
+        return false;
+    }
+    auto page_reader = row_group_reader->GetColumnPageReader(leaf_column_id);
+    if (page_reader == nullptr) {
+        return false;
+    }
+    auto column_reader =
+            ::parquet::ColumnReader::Make(column_schema.descriptor, 
std::move(page_reader));
+    if (column_reader == nullptr) {
+        return false;
+    }
+
+    int32_t dictionary_length = 0;
+    const void* dictionary = nullptr;
+    try {
+        dictionary = column_reader->ReadDictionary(&dictionary_length);
+    } catch (const ::parquet::ParquetException&) {
+        return false;
+    } catch (const std::exception&) {
+        return false;
+    }
+    if (dictionary == nullptr || dictionary_length <= 0) {
+        return false;
+    }
+
+    dict_words->values.reserve(static_cast<size_t>(dictionary_length));
+    if (column_schema.descriptor->physical_type() == 
::parquet::Type::BYTE_ARRAY) {
+        const auto* byte_array_values = reinterpret_cast<const 
::parquet::ByteArray*>(dictionary);
+        for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) {
+            dict_words->values.emplace_back(
+                    reinterpret_cast<const 
char*>(byte_array_values[dict_idx].ptr),
+                    byte_array_values[dict_idx].len);
+        }
+        dict_words->build_refs();
+        return true;
+    }
+    if (column_schema.descriptor->physical_type() == 
::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+        const int type_length = column_schema.descriptor->type_length();
+        if (type_length <= 0) {
+            return false;
+        }
+        const auto* flba_values = reinterpret_cast<const 
::parquet::FixedLenByteArray*>(dictionary);
+        for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) {
+            dict_words->values.emplace_back(
+                    reinterpret_cast<const char*>(flba_values[dict_idx].ptr), 
type_length);
+        }
+        dict_words->build_refs();
+        return true;
+    }
+    return false;
+}
+
 segment_v2::ZoneMap to_column_predicate_statistics(const 
ParquetColumnStatistics& statistics) {
     segment_v2::ZoneMap predicate_statistics;
     predicate_statistics.min_value = statistics.min_value;
@@ -181,26 +343,46 @@ bool ParquetStatisticsUtils::CheckStatistics(const 
reader::FileColumnPredicateFi
 }
 
 bool ParquetStatisticsUtils::RowGroupExcludes(
-        const ::parquet::RowGroupMetaData& row_group,
-        const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
+        const ::parquet::RowGroupMetaData& row_group, 
::parquet::ParquetFileReader* file_reader,
+        int row_group_idx, const 
std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
         const reader::FileColumnPredicateFilter& column_filter) {
     if (column_filter.predicates.empty()) {
         return false;
     }
-    DCHECK(column_filter.file_column_id >= 0 &&
-           column_filter.file_column_id < row_group.num_columns());
     DCHECK_LT(column_filter.file_column_id, schema.size());
-    auto column_chunk = row_group.ColumnChunk(column_filter.file_column_id);
+    const auto& column_schema = *schema[column_filter.file_column_id];
+    if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
+        column_schema.leaf_column_id < 0) {
+        return false;
+    }
+    DCHECK_LT(column_schema.leaf_column_id, row_group.num_columns());
+    auto column_chunk = row_group.ColumnChunk(column_schema.leaf_column_id);
     if (column_chunk == nullptr) {
         return false;
     }
-    return CheckStatistics(column_filter,
-                           
TransformColumnStatistics(*schema[column_filter.file_column_id],
-                                                     
column_chunk->statistics()));
+    if (CheckStatistics(column_filter,
+                        TransformColumnStatistics(column_schema, 
column_chunk->statistics()))) {
+        return true;
+    }
+    if (!supports_dictionary_pruning(column_schema, *column_chunk, 
column_filter) ||
+        !is_dictionary_encoded_chunk(*column_chunk)) {
+        return false;
+    }
+    OwnedDictionaryWords dict_words;
+    if (!read_dictionary_words(file_reader, row_group_idx, 
column_schema.leaf_column_id,
+                               column_schema, &dict_words)) {
+        return false;
+    }
+    for (const auto& column_predicate : column_filter.predicates) {
+        if (!column_predicate->evaluate_and(dict_words.refs.data(), 
dict_words.refs.size())) {
+            return true;
+        }
+    }
+    return false;
 }
 
 Status ParquetStatisticsUtils::SelectRowGroups(
-        const ::parquet::FileMetaData& metadata,
+        const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* 
file_reader,
         const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
         const reader::FileScanRequest& request, std::vector<int>* 
selected_row_groups) {
     if (selected_row_groups == nullptr) {
@@ -218,7 +400,8 @@ Status ParquetStatisticsUtils::SelectRowGroups(
         }
         bool drop = false;
         for (const auto& column_filter : request.column_predicate_filters) {
-            if (RowGroupExcludes(*row_group, file_schema, column_filter)) {
+            if (RowGroupExcludes(*row_group, file_reader, row_group_idx, 
file_schema,
+                                 column_filter)) {
                 drop = true;
                 break;
             }
@@ -246,10 +429,10 @@ bool ParquetStatisticsUtils::BloomFilterSupported(const 
ParquetColumnSchema& col
 }
 
 Status select_row_groups_by_statistics(
-        const ::parquet::FileMetaData& metadata,
+        const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* 
file_reader,
         const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
         const reader::FileScanRequest& request, std::vector<int>* 
selected_row_groups) {
-    return ParquetStatisticsUtils::SelectRowGroups(metadata, file_schema, 
request,
+    return ParquetStatisticsUtils::SelectRowGroups(metadata, file_reader, 
file_schema, request,
                                                    selected_row_groups);
 }
 
diff --git a/be/src/format/new_parquet/parquet_statistics.h 
b/be/src/format/new_parquet/parquet_statistics.h
index 4f43ae245b5..ff1c300e84c 100644
--- a/be/src/format/new_parquet/parquet_statistics.h
+++ b/be/src/format/new_parquet/parquet_statistics.h
@@ -26,6 +26,7 @@
 
 namespace parquet {
 class FileMetaData;
+class ParquetFileReader;
 class RowGroupMetaData;
 class Statistics;
 } // namespace parquet
@@ -66,11 +67,12 @@ struct ParquetStatisticsUtils {
                                 const ParquetColumnStatistics& statistics);
 
     static bool RowGroupExcludes(const ::parquet::RowGroupMetaData& row_group,
+                                 ::parquet::ParquetFileReader* file_reader, 
int row_group_idx,
                                  const 
std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
                                  const reader::FileColumnPredicateFilter& 
column_filter);
 
     static Status SelectRowGroups(
-            const ::parquet::FileMetaData& metadata,
+            const ::parquet::FileMetaData& metadata, 
::parquet::ParquetFileReader* file_reader,
             const std::vector<std::unique_ptr<ParquetColumnSchema>>& 
file_schema,
             const reader::FileScanRequest& request, std::vector<int>* 
selected_row_groups);
 
@@ -82,7 +84,7 @@ struct ParquetStatisticsUtils {
 // 后续 page index、dictionary、bloom filter 等文件格式优化也应继续收敛在这一层,避免污染
 // ParquetReader 的 scan 调度代码。
 Status select_row_groups_by_statistics(
-        const ::parquet::FileMetaData& metadata,
+        const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* 
file_reader,
         const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
         const reader::FileScanRequest& request, std::vector<int>* 
selected_row_groups);
 
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 0be12c27129..255ad574a26 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -203,6 +203,30 @@ void write_int_pair_parquet_file(const std::string& 
file_path, int64_t row_group
                                                       row_group_size, 
builder.build()));
 }
 
+void write_dictionary_filter_parquet_file(const std::string& file_path) {
+    auto schema = arrow::schema({
+            arrow::field("id", arrow::int32(), false),
+            arrow::field("value", arrow::utf8(), false),
+    });
+    auto table =
+            arrow::Table::Make(schema, {build_int32_array({1, 2, 3, 4, 5, 6}),
+                                        build_string_array({"aa", "az", "lm", 
"lz", "za", "zz"})});
+
+    auto file_result = arrow::io::FileOutputStream::Open(file_path);
+    ASSERT_TRUE(file_result.ok()) << file_result.status();
+    std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+    ::parquet::WriterProperties::Builder builder;
+    builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+    builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+    builder.compression(::parquet::Compression::UNCOMPRESSED);
+    builder.enable_dictionary("value");
+    builder.disable_dictionary("id");
+    builder.disable_statistics();
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out, 1,
+                                                      builder.build()));
+}
+
 Block build_file_block(const std::vector<reader::SchemaField>& schema) {
     Block block;
     for (const auto& field : schema) {
@@ -576,6 +600,103 @@ TEST_F(NewParquetReaderTest, 
PredicateFiltersRowGroupsByStatistics) {
     EXPECT_EQ(values, std::vector<std::string>({"three", "four", "five"}));
 }
 
+TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByDictionary) {
+    write_dictionary_filter_parquet_file(_file_path);
+    auto parquet_file_reader = 
::parquet::ParquetFileReader::OpenFile(_file_path, false);
+    ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 6);
+    for (int row_group_idx = 0; row_group_idx < 6; ++row_group_idx) {
+        auto row_group = 
parquet_file_reader->metadata()->RowGroup(row_group_idx);
+        ASSERT_NE(row_group, nullptr);
+        auto value_chunk = row_group->ColumnChunk(1);
+        ASSERT_NE(value_chunk, nullptr);
+        ASSERT_TRUE(value_chunk->has_dictionary_page());
+        ASSERT_TRUE(value_chunk->statistics() == nullptr ||
+                    !value_chunk->statistics()->HasMinMax());
+    }
+
+    auto reader = create_reader();
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    ASSERT_TRUE(reader->init(&state).ok());
+
+    std::vector<reader::SchemaField> schema;
+    ASSERT_TRUE(reader->get_schema(&schema).ok());
+    auto request = std::make_unique<reader::FileScanRequest>();
+    request->predicate_columns = {1};
+    request->non_predicate_columns = {0};
+    reader::FileColumnPredicateFilter column_filter;
+    column_filter.file_column_id = 1;
+    
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::EQ>(
+            1, "value", schema[1].type, 
Field::create_field<TYPE_STRING>("lm"), false));
+    request->column_predicate_filters.push_back(std::move(column_filter));
+    ASSERT_TRUE(reader->open(request).ok());
+
+    std::vector<int32_t> ids;
+    std::vector<std::string> values;
+    bool eof = false;
+    while (!eof) {
+        Block block = build_file_block(schema);
+        size_t rows = 0;
+        ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+        if (rows == 0) {
+            continue;
+        }
+        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+        const auto& value_column =
+                assert_cast<const 
ColumnString&>(*block.get_by_position(1).column);
+        for (size_t row = 0; row < rows; ++row) {
+            ids.push_back(id_column.get_element(row));
+            values.push_back(value_column.get_data_at(row).to_string());
+        }
+    }
+
+    EXPECT_EQ(ids, std::vector<int32_t>({3}));
+    EXPECT_EQ(values, std::vector<std::string>({"lm"}));
+}
+
+TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) {
+    write_dictionary_filter_parquet_file(_file_path);
+    auto reader = create_reader();
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    ASSERT_TRUE(reader->init(&state).ok());
+
+    std::vector<reader::SchemaField> schema;
+    ASSERT_TRUE(reader->get_schema(&schema).ok());
+    auto request = std::make_unique<reader::FileScanRequest>();
+    request->predicate_columns = {1};
+    request->non_predicate_columns = {0};
+    auto set = build_set<TYPE_STRING>();
+    set->insert(const_cast<char*>("az"), 2);
+    set->insert(const_cast<char*>("za"), 2);
+    reader::FileColumnPredicateFilter column_filter;
+    column_filter.file_column_id = 1;
+    
column_filter.predicates.push_back(create_in_list_predicate<PredicateType::IN_LIST>(
+            1, "value", schema[1].type, set, false));
+    request->column_predicate_filters.push_back(std::move(column_filter));
+    ASSERT_TRUE(reader->open(request).ok());
+
+    std::vector<int32_t> ids;
+    std::vector<std::string> values;
+    bool eof = false;
+    while (!eof) {
+        Block block = build_file_block(schema);
+        size_t rows = 0;
+        ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+        if (rows == 0) {
+            continue;
+        }
+        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+        const auto& value_column =
+                assert_cast<const 
ColumnString&>(*block.get_by_position(1).column);
+        for (size_t row = 0; row < rows; ++row) {
+            ids.push_back(id_column.get_element(row));
+            values.push_back(value_column.get_data_at(row).to_string());
+        }
+    }
+
+    EXPECT_EQ(ids, std::vector<int32_t>({2, 5}));
+    EXPECT_EQ(values, std::vector<std::string>({"az", "za"}));
+}
+
 TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) {
     write_parquet_file(_file_path, 2);
     auto parquet_file_reader = 
::parquet::ParquetFileReader::OpenFile(_file_path, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to