This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 4073912ea67 [feature](be) Support expression filters on file reader
(#63748)
4073912ea67 is described below
commit 4073912ea6705a84e4bb6a3add2a37294af400d8
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 18:23:58 2026 +0800
[feature](be) Support expression filters on file reader (#63748)
---
be/src/format/new_parquet/parquet_reader.cpp | 45 ++-
be/src/format/new_parquet/parquet_reader.h | 3 +-
be/src/format/new_parquet/parquet_statistics.cpp | 26 +-
be/src/format/new_parquet/parquet_statistics.h | 7 +-
be/src/format/reader/column_mapper.cpp | 98 +++++--
be/src/format/reader/column_mapper.h | 16 +-
be/src/format/reader/file_reader.h | 23 +-
be/src/format/reader/table_reader.cpp | 26 +-
be/src/format/reader/table_reader.h | 16 +-
be/test/format/new_parquet/parquet_reader_test.cpp | 127 ++++++++-
be/test/format/reader/table_reader_test.cpp | 312 ++++++++++++++++++++-
11 files changed, 589 insertions(+), 110 deletions(-)
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 190aa87f251..6d0ef3eb742 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -291,10 +291,6 @@ Status
ParquetReader::_get_projected_schema_field(reader::ColumnId file_column_i
return Status::OK();
}
-bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter&
local_filter) {
- return local_filter.conjunct != nullptr;
-}
-
Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block*
file_block,
SelectionVector* selection,
uint16_t* selected_rows) {
selection->resize(static_cast<size_t>(batch_rows));
@@ -314,28 +310,29 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
column_reader->name(), column_rows,
batch_rows);
}
file_block->replace_by_position(block_position, std::move(column));
+ }
+ return _execute_filter_conjuncts(batch_rows, file_block, selection,
selected_rows);
+}
- for (const auto& local_filter : _request->local_filters) {
- if (local_filter.file_column_id != file_field_id ||
- !_has_expression_filter(local_filter)) {
- continue;
- }
- if (*selected_rows == 0) {
- break;
- }
- IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
- bool can_filter_all = false;
- RETURN_IF_ERROR(local_filter.conjunct->execute_filter(file_block,
filter.data(),
-
static_cast<size_t>(batch_rows),
- false,
&can_filter_all));
- *selected_rows =
- can_filter_all ? 0
- : _apply_filter_to_selection(filter,
selection, *selected_rows);
- break;
+Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block*
file_block,
+ SelectionVector* selection,
+ uint16_t* selected_rows) {
+ // Expression filters may reference several predicate columns. Execute
them only after all
+ // predicate columns in the file-local block have been materialized.
+ for (const auto& expression_filter : _request->expression_filters) {
+ if (expression_filter.conjunct == nullptr) {
+ continue;
}
if (*selected_rows == 0) {
break;
}
+ IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+ bool can_filter_all = false;
+ RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
+ file_block, filter.data(), static_cast<size_t>(batch_rows),
false,
+ &can_filter_all));
+ *selected_rows =
+ can_filter_all ? 0 : _apply_filter_to_selection(filter,
selection, *selected_rows);
}
return Status::OK();
}
@@ -557,10 +554,10 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
}
- for (const auto& local_filter : _request->local_filters) {
- if (local_filter.file_column_id < 0 || local_filter.file_column_id >=
num_fields) {
+ for (const auto& column_filter : _request->column_predicate_filters) {
+ if (column_filter.file_column_id < 0 || column_filter.file_column_id
>= num_fields) {
return Status::InvalidArgument("Invalid parquet filter top-level
field id {}",
- local_filter.file_column_id);
+ column_filter.file_column_id);
}
}
for (const auto& [file_column_id, projection] :
_request->complex_projections) {
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index f6d47f46134..aa5cbfb5fcd 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -127,9 +127,10 @@ private:
Status _get_projected_schema_field(reader::ColumnId file_column_id,
const reader::FieldProjection*
projection,
reader::SchemaField* field) const;
- bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
Status _read_filter_columns(int64_t batch_rows, Block* file_block,
SelectionVector* selection,
uint16_t* selected_rows);
+ Status _execute_filter_conjuncts(int64_t batch_rows, Block* file_block,
+ SelectionVector* selection, uint16_t*
selected_rows);
IColumn::Filter _selection_to_filter(const SelectionVector& selection,
uint16_t selected_rows,
int64_t batch_rows);
uint16_t _apply_filter_to_selection(const IColumn::Filter& filter,
SelectionVector* selection,
diff --git a/be/src/format/new_parquet/parquet_statistics.cpp
b/be/src/format/new_parquet/parquet_statistics.cpp
index aebc6d4e04d..a28ccb8ae25 100644
--- a/be/src/format/new_parquet/parquet_statistics.cpp
+++ b/be/src/format/new_parquet/parquet_statistics.cpp
@@ -159,14 +159,13 @@ ParquetColumnStatistics
ParquetStatisticsUtils::TransformColumnStatistics(
}
}
-bool ParquetStatisticsUtils::CheckStatistics(const reader::FileLocalFilter&
local_filter,
+bool ParquetStatisticsUtils::CheckStatistics(const
reader::FileColumnPredicateFilter& column_filter,
const ParquetColumnStatistics&
statistics) {
if (!statistics.has_any_statistics()) {
return false;
}
- // TODO: replace local_filter.predicates by local_filter.conjuncts
- for (const auto& column_predicate : local_filter.predicates) {
+ for (const auto& column_predicate : column_filter.predicates) {
if (is_null_only_predicate(*column_predicate)) {
if (!statistics.has_null_count) {
continue;
@@ -184,16 +183,19 @@ bool ParquetStatisticsUtils::CheckStatistics(const
reader::FileLocalFilter& loca
bool ParquetStatisticsUtils::RowGroupExcludes(
const ::parquet::RowGroupMetaData& row_group,
const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
- const reader::FileLocalFilter& local_filter) {
- DCHECK(local_filter.file_column_id >= 0 &&
- local_filter.file_column_id < row_group.num_columns());
- DCHECK_LT(local_filter.file_column_id, schema.size());
- auto column_chunk = row_group.ColumnChunk(local_filter.file_column_id);
+ const reader::FileColumnPredicateFilter& column_filter) {
+ if (column_filter.predicates.empty()) {
+ return false;
+ }
+ DCHECK(column_filter.file_column_id >= 0 &&
+ column_filter.file_column_id < row_group.num_columns());
+ DCHECK_LT(column_filter.file_column_id, schema.size());
+ auto column_chunk = row_group.ColumnChunk(column_filter.file_column_id);
if (column_chunk == nullptr) {
return false;
}
- return CheckStatistics(local_filter,
-
TransformColumnStatistics(*schema[local_filter.file_column_id],
+ return CheckStatistics(column_filter,
+
TransformColumnStatistics(*schema[column_filter.file_column_id],
column_chunk->statistics()));
}
@@ -215,8 +217,8 @@ Status ParquetStatisticsUtils::SelectRowGroups(
continue;
}
bool drop = false;
- for (const auto& local_filter : request.local_filters) {
- if (RowGroupExcludes(*row_group, file_schema, local_filter)) {
+ for (const auto& column_filter : request.column_predicate_filters) {
+ if (RowGroupExcludes(*row_group, file_schema, column_filter)) {
drop = true;
break;
}
diff --git a/be/src/format/new_parquet/parquet_statistics.h
b/be/src/format/new_parquet/parquet_statistics.h
index 0def08d4b08..4f43ae245b5 100644
--- a/be/src/format/new_parquet/parquet_statistics.h
+++ b/be/src/format/new_parquet/parquet_statistics.h
@@ -60,13 +60,14 @@ struct ParquetStatisticsUtils {
const ParquetColumnSchema& column_schema,
const std::shared_ptr<::parquet::Statistics>& statistics);
- // Return true if the statistics indicate that the row group can be safely
skipped according to the local filter.
- static bool CheckStatistics(const reader::FileLocalFilter& local_filter,
+ // Return true if the statistics indicate that the row group can be safely
skipped according to
+ // the local single-column predicate filter.
+ static bool CheckStatistics(const reader::FileColumnPredicateFilter&
column_filter,
const ParquetColumnStatistics& statistics);
static bool RowGroupExcludes(const ::parquet::RowGroupMetaData& row_group,
const
std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
- const reader::FileLocalFilter& local_filter);
+ const reader::FileColumnPredicateFilter&
column_filter);
static Status SelectRowGroups(
const ::parquet::FileMetaData& metadata,
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index 4d9afdeff32..1a33781b965 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -17,6 +17,7 @@
#include "format/reader/column_mapper.h"
+#include <algorithm>
#include <cstddef>
#include <memory>
#include <utility>
@@ -69,6 +70,8 @@ static constexpr const char*
ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update
static void add_scan_column(FileScanRequest* file_request, ColumnId
file_column_id,
std::vector<ColumnId>* scan_columns) {
+ // column_positions is the global read-column index for this scan request,
so it also
+ // deduplicates predicate_columns and non_predicate_columns across all
filter/projection paths.
if (file_request->column_positions.count(file_column_id) == 0) {
file_request->column_positions.emplace(file_column_id,
file_request->column_positions.size());
@@ -210,6 +213,13 @@ static Status rebuild_projected_file_type(ColumnMapping*
mapping) {
return Status::OK();
}
+static std::vector<int32_t> filter_slot_ids(const TableFilter& table_filter) {
+ if (!table_filter.slot_ids.empty()) {
+ return table_filter.slot_ids;
+ }
+ return {};
+}
+
Status TableColumnMapper::create_mapping(const std::vector<TableColumn>&
projected_columns,
const std::map<std::string, Field>&
partition_values,
const std::vector<SchemaField>&
file_schema) {
@@ -250,7 +260,8 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
return Status::OK();
}
-Status TableColumnMapper::create_scan_request(const std::map<int32_t,
TableFilter>& table_filters,
+Status TableColumnMapper::create_scan_request(const std::vector<TableFilter>&
table_filters,
+ const TableColumnPredicates&
table_column_predicates,
const std::vector<TableColumn>&
projected_columns,
FileScanRequest* file_request) {
// FileReader evaluates expressions against a file-local block. This
mapper owns the
@@ -259,12 +270,27 @@ Status TableColumnMapper::create_scan_request(const
std::map<int32_t, TableFilte
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
file_request->complex_projections.clear();
- file_request->local_filters.clear();
+ file_request->expression_filters.clear();
+ file_request->column_predicate_filters.clear();
file_request->reader_expression_map.clear();
+ // 1. Build referenced non-predicate columns
for (const auto& table_column : projected_columns) {
auto* mapping = _find_mapping(table_column.id);
if (mapping != nullptr && mapping->file_column_id.has_value()) {
- if (table_filters.count(table_column.id) == 0) {
+ // A file column can be read lazily as a non-predicate column only
when it is not used
+ // by either expression filters or single-column predicate pruning.
+ bool used_by_filter =
table_column_predicates.count(table_column.id) > 0;
+ if (!used_by_filter) {
+ for (const auto& table_filter : table_filters) {
+ const auto slot_ids = filter_slot_ids(table_filter);
+ if (std::find(slot_ids.begin(), slot_ids.end(),
table_column.id) !=
+ slot_ids.end()) {
+ used_by_filter = true;
+ break;
+ }
+ }
+ }
+ if (!used_by_filter) {
add_scan_column(file_request, *mapping->file_column_id,
&file_request->non_predicate_columns);
}
@@ -280,7 +306,9 @@ Status TableColumnMapper::create_scan_request(const
std::map<int32_t, TableFilte
}
}
}
- RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+ // 2. Build referenced predicate columns
+ RETURN_IF_ERROR(localize_filters(table_filters, table_column_predicates,
file_request));
+ // 3. Re-build projections for all referenced file columns to point to the
correct file-local block positions.
for (auto& mapping : _mappings) {
if (!mapping.file_column_id.has_value()) {
continue;
@@ -292,18 +320,29 @@ Status TableColumnMapper::create_scan_request(const
std::map<int32_t, TableFilte
return Status::OK();
}
-Status TableColumnMapper::localize_filters(const std::map<int32_t,
TableFilter>& table_filters,
+Status TableColumnMapper::localize_filters(const std::vector<TableFilter>&
table_filters,
+ const TableColumnPredicates&
table_column_predicates,
FileScanRequest* file_request)
const {
// 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
// finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
- for (const auto& it : table_filters) {
- const auto* mapping = _find_mapping(it.first);
- if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+ for (const auto& table_filter : table_filters) {
+ if (!table_filter.can_be_localized()) {
+ // TODO: Rewrite table filter to reader_expression_map
+ // file_request->reader_expression_map.emplace_back(...,
table_filter.conjunct);
continue;
}
- if (!it.second.can_be_localized()) {
- // TODO: Rewrite table filter to reader_expression_map
- //
file_request->reader_expression_map.emplace_back(mapping->table_column_id,
it.second.conjunct);
+ for (const auto table_column_id : filter_slot_ids(table_filter)) {
+ const auto* mapping = _find_mapping(table_column_id);
+ if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+ continue;
+ }
+ add_scan_column(file_request, *mapping->file_column_id,
+ &file_request->predicate_columns);
+ }
+ }
+ for (const auto& [table_column_id, _] : table_column_predicates) {
+ const auto* mapping = _find_mapping(table_column_id);
+ if (mapping == nullptr || !mapping->file_column_id.has_value()) {
continue;
}
add_scan_column(file_request, *mapping->file_column_id,
&file_request->predicate_columns);
@@ -312,20 +351,35 @@ Status TableColumnMapper::localize_filters(const
std::map<int32_t, TableFilter>&
// Build the complete table-slot to file-block position map after all
predicate columns have
// been assigned. This keeps expression localization independent from
filter iteration order.
const auto table_column_to_file_position =
build_file_position_map(_mappings, *file_request);
- for (const auto& it : table_filters) {
- const auto* mapping = _find_mapping(it.first);
- if (mapping == nullptr || !mapping->file_column_id.has_value() ||
- !it.second.can_be_localized()) {
+ for (const auto& table_filter : table_filters) {
+ if (!table_filter.can_be_localized()) {
continue;
}
- FileLocalFilter local_filter;
- local_filter.file_column_id = *mapping->file_column_id;
- if (it.second.conjunct != nullptr) {
- local_filter.conjunct =
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
- it.second.conjunct->root(),
table_column_to_file_position));
+ if (table_filter.conjunct != nullptr) {
+ FileExpressionFilter expression_filter;
+ expression_filter.conjunct =
+
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
+ table_filter.conjunct->root(),
table_column_to_file_position));
+
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
+ for (const auto table_column_id : table_filter.slot_ids) {
+ const auto* mapping = _find_mapping(table_column_id);
+ if (mapping == nullptr ||
!mapping->file_column_id.has_value()) {
+ continue;
+ }
+
expression_filter.file_column_ids.push_back(*mapping->file_column_id);
+ }
+
file_request->expression_filters.push_back(std::move(expression_filter));
+ }
+ }
+ for (const auto& [table_column_id, predicates] : table_column_predicates) {
+ const auto* mapping = _find_mapping(table_column_id);
+ if (mapping == nullptr || !mapping->file_column_id.has_value() ||
predicates.empty()) {
+ continue;
}
- local_filter.predicates = it.second.predicates;
- file_request->local_filters.push_back(std::move(local_filter));
+ FileColumnPredicateFilter column_predicate_filter;
+ column_predicate_filter.file_column_id = *mapping->file_column_id;
+ column_predicate_filter.predicates = predicates;
+
file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
}
return Status::OK();
}
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
index 0c6ac9c8e6c..bcfe7152208 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -31,6 +31,10 @@
#include "exprs/vexpr_fwd.h"
#include "format/reader/expr/literal.h"
+namespace doris {
+class ColumnPredicate;
+} // namespace doris
+
namespace doris::reader {
struct TableColumn;
@@ -39,6 +43,9 @@ struct SchemaField;
struct FileScanRequest;
struct FieldProjection;
+using TableColumnPredicates =
+ std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+
enum class TableColumnMappingMode {
BY_FIELD_ID,
BY_NAME,
@@ -100,15 +107,18 @@ public:
// 把 table-level scan 请求转换成 file-local scan 请求。
// table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
- // projected_file_columns、local_filters 和 reader_expression_map。
- virtual Status create_scan_request(const std::map<int32_t, TableFilter>&
table_filters,
+ // projected_file_columns、expression_filters、column_predicate_filters 和
+ // reader_expression_map。
+ virtual Status create_scan_request(const std::vector<TableFilter>&
table_filters,
+ const TableColumnPredicates&
table_column_predicates,
const std::vector<TableColumn>&
projected_columns,
FileScanRequest* file_request);
// 将 table-level filter 定位到文件 schema。
// trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
// 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback
处理。
- virtual Status localize_filters(const std::map<int32_t, TableFilter>&
table_filters,
+ virtual Status localize_filters(const std::vector<TableFilter>&
table_filters,
+ const TableColumnPredicates&
table_column_predicates,
FileScanRequest* file_request) const;
void clear() { _mappings.clear(); }
const std::vector<ColumnMapping>& mappings() const { return _mappings; }
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index 918e2b4bd35..69720bc8f9a 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -75,21 +75,19 @@ struct FieldProjection {
std::vector<FieldProjection> children;
};
-// 已经 localize 到文件 schema 的过滤条件。
-// TableColumnMapper 负责把 table-level filter 转成这个结构;FileReader 只消费
-// file-local column id、表达式和结构化谓词。
-struct FileLocalFilter {
- ColumnId file_column_id = -1;
-
- // 表达式过滤。适合 cast、复杂表达式或 reader_expression_map 生成的临时列过滤。
- // 它通常不能直接驱动 row group stats、page index、dictionary、bloom filter。
+// File-local expression filter. It may reference multiple predicate_columns,
so FileReader should
+// evaluate it after all referenced predicate columns have been materialized
in the file-local block.
+struct FileExpressionFilter {
VExprContextSPtr conjunct;
// DeletePredicate
VExprContextSPtr delete_conjunct;
+ std::vector<ColumnId> file_column_ids;
+};
- // 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
- // bloom filter 等只理解单列谓词的优化。
- // TODO: conjunct 支持表达所有 filter 语义之后删除。
+// File-local single-column predicates for file-layer pruning, such as
min/max, page index,
+// dictionary and bloom filter. Predicates must all belong to file_column_id.
+struct FileColumnPredicateFilter {
+ ColumnId file_column_id = -1;
std::vector<std::shared_ptr<ColumnPredicate>> predicates;
};
@@ -110,7 +108,8 @@ struct FileScanRequest {
std::vector<ColumnId> non_predicate_columns;
std::map<ColumnId, size_t> column_positions;
std::map<ColumnId, FieldProjection> complex_projections;
- std::vector<FileLocalFilter> local_filters;
+ std::vector<FileExpressionFilter> expression_filters;
+ std::vector<FileColumnPredicateFilter> column_predicate_filters;
// fallback path if filters cannot be localized to file-local predicates.
The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
};
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index f6cfa21600e..58de8378589 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -48,16 +48,10 @@ void collect_table_slot_ids(const VExprSPtr& expr,
std::set<int>* slot_ids) {
}
void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
- std::map<int32_t, TableFilter>*
table_filters) {
+ std::vector<TableFilter>*
table_filters) {
if (conjunct == nullptr) {
return;
}
- std::set<int> slot_ids;
- collect_table_slot_ids(conjunct, &slot_ids);
- if (slot_ids.size() == 1) {
- (*table_filters)[*slot_ids.begin()].conjunct =
VExprContext::create_shared(conjunct);
- return;
- }
if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED &&
conjunct->op() == TExprOpcode::COMPOUND_AND) {
for (const auto& child : conjunct->children()) {
@@ -65,6 +59,15 @@ void build_table_filters_from_conjunct(const VExprSPtr&
conjunct,
}
return;
}
+ std::set<int> slot_ids;
+ collect_table_slot_ids(conjunct, &slot_ids);
+ if (!slot_ids.empty()) {
+ TableFilter table_filter;
+ table_filter.conjunct = VExprContext::create_shared(conjunct);
+ table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
+ table_filters->push_back(std::move(table_filter));
+ return;
+ }
}
} // namespace
@@ -100,6 +103,7 @@ Status TableReader::init(TableReadOptions options) {
mapper_options.allow_missing_columns = options.allow_missing_columns;
_data_reader.column_mapper = TableColumnMapper(mapper_options);
_conjuncts = std::move(options.conjuncts);
+ _table_column_predicates = std::move(options.column_predicates);
return Status::OK();
}
@@ -111,12 +115,12 @@ Status TableReader::_build_table_filters_from_conjuncts()
{
Status TableReader::_open_local_filter_exprs(const FileScanRequest&
file_request) {
RowDescriptor row_desc;
- for (const auto& local_filter : file_request.local_filters) {
- if (local_filter.conjunct == nullptr) {
+ for (const auto& expression_filter : file_request.expression_filters) {
+ if (expression_filter.conjunct == nullptr) {
continue;
}
- RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state,
row_desc));
- RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state));
+ RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state,
row_desc));
+ RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
}
return Status::OK();
}
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index c9589af8017..2cf5eb30468 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -62,15 +62,14 @@ struct TableColumn {
};
// table-level filter。
-// TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。
+// TableColumnMapper 负责把它转换成 FileExpressionFilter 或 reader_expression_map。
struct TableFilter {
// 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
VExprContextSPtr conjunct;
- // 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、
- // bloom filter 等优化。
- // TODO: conjunct 支持表达所有 filter 语义之后删除。
- std::vector<std::shared_ptr<ColumnPredicate>> predicates;
+ // Table slot ids referenced by conjunct. A single expression filter may
depend on multiple
+ // columns, while ColumnPredicate pruning still belongs to one concrete
column.
+ std::vector<int32_t> slot_ids;
bool can_be_localized() const { return true; }
};
@@ -105,6 +104,7 @@ struct ReadProfile {
struct TableReadOptions {
const std::vector<TableColumn> projected_columns;
+ const TableColumnPredicates column_predicates;
// All conjuncts from scan operator
const VExprContext conjuncts;
const FileFormat format;
@@ -229,7 +229,7 @@ protected:
auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
- _table_filters, _projected_columns, file_request.get()));
+ _table_filters, _table_column_predicates, _projected_columns,
file_request.get()));
RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
@@ -266,6 +266,7 @@ protected:
_data_reader.reader.reset();
_data_reader.column_mapper.clear();
_table_filters.clear();
+ _table_column_predicates.clear();
_data_reader.block_schema.clear();
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
@@ -331,7 +332,8 @@ protected:
std::shared_ptr<io::FileSystemProperties> _system_properties;
// partition key -> value
std::map<std::string, Field> _partition_values;
- std::map<int32_t, TableFilter> _table_filters;
+ std::vector<TableFilter> _table_filters;
+ TableColumnPredicates _table_column_predicates;
VExprContext _conjuncts {nullptr};
std::unique_ptr<ReadProfile> _profile;
// Parsed from DELETION_VECTOR in Iceberg and Paimon
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 5341b4060b5..43ec9cc0ab1 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -42,6 +42,7 @@
#include "exprs/vexpr_context.h"
#include "format/reader/column_mapper.h"
#include "format/reader/file_reader.h"
+#include "format/reader/table_reader.h"
#include "gen_cpp/Types_types.h"
#include "io/io_common.h"
#include "runtime/runtime_state.h"
@@ -82,6 +83,41 @@ private:
const std::string _expr_name = "Int32GreaterThanExpr";
};
+class Int32SumGreaterThanExpr final : public VExpr {
+public:
+ Int32SumGreaterThanExpr(int left_column_id, int right_column_id, int32_t
value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false),
+ _left_column_id(left_column_id),
+ _right_column_id(right_column_id),
+ _value(value) {}
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ const auto& left_input =
+ assert_cast<const
ColumnInt32&>(*block->get_by_position(_left_column_id).column);
+ const auto& right_input =
+ assert_cast<const
ColumnInt32&>(*block->get_by_position(_right_column_id).column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ const size_t input_row = selector == nullptr ? row :
(*selector)[row];
+ result_data[row] =
+ left_input.get_element(input_row) +
right_input.get_element(input_row) > _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int _left_column_id;
+ const int _right_column_id;
+ const int32_t _value;
+ const std::string _expr_name = "Int32SumGreaterThanExpr";
+};
+
VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t
value) {
auto ctx =
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id,
value));
@@ -90,6 +126,15 @@ VExprContextSPtr create_int32_greater_than_conjunct(int
column_id, int32_t value
return ctx;
}
+VExprContextSPtr create_int32_sum_greater_than_conjunct(int left_column_id,
int right_column_id,
+ int32_t value) {
+ auto ctx = VExprContext::create_shared(
+ std::make_shared<Int32SumGreaterThanExpr>(left_column_id,
right_column_id, value));
+ ctx->_prepared = true;
+ ctx->_opened = true;
+ return ctx;
+}
+
std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(builder->Finish(&array).ok());
@@ -133,6 +178,28 @@ void write_parquet_file(const std::string& file_path,
int64_t row_group_size = R
row_group_size,
builder.build()));
}
+void write_int_pair_parquet_file(const std::string& file_path, int64_t
row_group_size = ROW_COUNT) {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("score", arrow::int32(), false),
+ arrow::field("value", arrow::utf8(), false),
+ });
+ auto table = arrow::Table::Make(
+ schema, {build_int32_array({1, 2, 3, 4, 5}), build_int32_array({1,
2, 3, 4, 5}),
+ build_string_array({"one", "two", "three", "four",
"five"})});
+
+ auto file_result = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(file_result.ok()) << file_result.status();
+ std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+ ::parquet::WriterProperties::Builder builder;
+ builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+ builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+ builder.compression(::parquet::Compression::UNCOMPRESSED);
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+ *table, arrow::default_memory_pool(), out, row_group_size,
builder.build()));
+}
+
Block build_file_block(const std::vector<reader::SchemaField>& schema) {
Block block;
for (const auto& field : schema) {
@@ -216,7 +283,7 @@ TEST(TableColumnMapperTest,
CreatesComplexProjectionForStructChildren) {
ASSERT_TRUE(mapper.create_mapping({table_column}, {},
{struct_field}).ok());
auto request = std::make_unique<reader::FileScanRequest>();
- ASSERT_TRUE(mapper.create_scan_request({}, {table_column},
request.get()).ok());
+ ASSERT_TRUE(mapper.create_scan_request({}, {}, {table_column},
request.get()).ok());
ASSERT_EQ(request->non_predicate_columns,
std::vector<reader::ColumnId>({0}));
ASSERT_EQ(request->complex_projections.size(), 1);
const auto& projection = request->complex_projections.at(0);
@@ -359,12 +426,14 @@ TEST_F(NewParquetReaderTest,
ReadPredicateAndNonPredicateColumnsWithSelection) {
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0};
request->non_predicate_columns = {1};
- reader::FileLocalFilter filter;
- filter.file_column_id = 0;
- filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+ request->expression_filters.push_back(std::move(expression_filter));
+ reader::FileColumnPredicateFilter column_filter;
+ column_filter.file_column_id = 0;
+
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
- request->local_filters.push_back(std::move(filter));
+ request->column_predicate_filters.push_back(std::move(column_filter));
ASSERT_TRUE(reader->open(request).ok());
size_t rows = 0;
@@ -391,6 +460,40 @@ TEST_F(NewParquetReaderTest,
ReadPredicateAndNonPredicateColumnsWithSelection) {
EXPECT_EQ(rows, 0);
}
+TEST_F(NewParquetReaderTest, ReadMultiPredicateColumnsBeforeExpressionFilter) {
+ write_int_pair_parquet_file(_file_path);
+ auto reader = create_reader();
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ Block block = build_file_block(schema);
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->predicate_columns = {0, 1};
+ request->non_predicate_columns = {};
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.conjunct = create_int32_sum_greater_than_conjunct(0, 1,
7);
+ request->expression_filters.push_back(std::move(expression_filter));
+ ASSERT_TRUE(reader->open(request).ok());
+
+ size_t rows = 0;
+ bool eof = false;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ EXPECT_FALSE(eof);
+ ASSERT_EQ(rows, 2);
+
+ const auto& ids = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& scores = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ ASSERT_EQ(ids.size(), 2);
+ ASSERT_EQ(scores.size(), 2);
+ EXPECT_EQ(ids.get_element(0), 4);
+ EXPECT_EQ(ids.get_element(1), 5);
+ EXPECT_EQ(scores.get_element(0), 4);
+ EXPECT_EQ(scores.get_element(1), 5);
+}
+
TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) {
write_parquet_file(_file_path, 2);
auto parquet_file_reader =
::parquet::ParquetFileReader::OpenFile(_file_path, false);
@@ -405,12 +508,14 @@ TEST_F(NewParquetReaderTest,
PredicateFiltersRowGroupsByStatistics) {
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0};
request->non_predicate_columns = {1};
- reader::FileLocalFilter filter;
- filter.file_column_id = 0;
- filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+ request->expression_filters.push_back(std::move(expression_filter));
+ reader::FileColumnPredicateFilter column_filter;
+ column_filter.file_column_id = 0;
+
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
- request->local_filters.push_back(std::move(filter));
+ request->column_predicate_filters.push_back(std::move(column_filter));
ASSERT_TRUE(reader->open(request).ok());
std::vector<int32_t> ids;
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index dc2e26f35ea..3d132244122 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -22,6 +22,7 @@
#include <gtest/gtest.h>
#include <parquet/arrow/writer.h>
+#include <algorithm>
#include <filesystem>
#include <memory>
#include <string>
@@ -37,6 +38,7 @@
#include "format/reader/expr/slot_ref.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
+#include "storage/predicate/predicate_creator.h"
namespace doris::reader {
namespace {
@@ -75,6 +77,86 @@ private:
const std::string _expr_name = "TableInt32GreaterThanExpr";
};
+class TableInt32SumGreaterThanExpr final : public VExpr {
+public:
+ TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int
right_slot_id,
+ int right_column_id, int32_t value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+ add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+
std::make_shared<DataTypeInt32>(), "id"));
+ add_child(TableSlotRef::create_shared(right_slot_id, right_column_id,
-1,
+
std::make_shared<DataTypeInt32>(), "score"));
+ set_node_type(TExprNodeType::BINARY_PRED);
+ _opcode = TExprOpcode::GT;
+ }
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ const auto* left_slot_ref = assert_cast<const
VSlotRef*>(get_child(0).get());
+ const auto* right_slot_ref = assert_cast<const
VSlotRef*>(get_child(1).get());
+ const auto& left_input = assert_cast<const ColumnInt32&>(
+ *block->get_by_position(left_slot_ref->column_id()).column);
+ const auto& right_input = assert_cast<const ColumnInt32&>(
+ *block->get_by_position(right_slot_ref->column_id()).column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ const size_t input_row = selector == nullptr ? row :
(*selector)[row];
+ result_data[row] =
+ left_input.get_element(input_row) +
right_input.get_element(input_row) > _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int32_t _value;
+ const std::string _expr_name = "TableInt32SumGreaterThanExpr";
+};
+
+class TableInt32SumLessThanExpr final : public VExpr {
+public:
+ TableInt32SumLessThanExpr(int left_slot_id, int left_column_id, int
right_slot_id,
+ int right_column_id, int32_t value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+ add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+
std::make_shared<DataTypeInt32>(), "id"));
+ add_child(TableSlotRef::create_shared(right_slot_id, right_column_id,
-1,
+
std::make_shared<DataTypeInt32>(), "score"));
+ set_node_type(TExprNodeType::BINARY_PRED);
+ _opcode = TExprOpcode::LT;
+ }
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ const auto* left_slot_ref = assert_cast<const
VSlotRef*>(get_child(0).get());
+ const auto* right_slot_ref = assert_cast<const
VSlotRef*>(get_child(1).get());
+ const auto& left_input = assert_cast<const ColumnInt32&>(
+ *block->get_by_position(left_slot_ref->column_id()).column);
+ const auto& right_input = assert_cast<const ColumnInt32&>(
+ *block->get_by_position(right_slot_ref->column_id()).column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ const size_t input_row = selector == nullptr ? row :
(*selector)[row];
+ result_data[row] =
+ left_input.get_element(input_row) +
right_input.get_element(input_row) < _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int32_t _value;
+ const std::string _expr_name = "TableInt32SumLessThanExpr";
+};
+
std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(builder->Finish(&array).ok());
@@ -117,6 +199,32 @@ void write_parquet_file(const std::string& file_path,
int32_t id, const std::str
*table, arrow::default_memory_pool(), out, 1, builder.build()));
}
+void write_int_pair_parquet_file(const std::string& file_path, const
std::vector<int32_t>& ids,
+ const std::vector<int32_t>& scores,
+ const std::vector<std::string>& values,
+ int64_t row_group_size = -1) {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("score", arrow::int32(), false),
+ arrow::field("value", arrow::utf8(), false),
+ });
+ auto table = arrow::Table::Make(schema, {build_int32_array(ids),
build_int32_array(scores),
+ build_string_array(values)});
+
+ auto file_result = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(file_result.ok()) << file_result.status();
+ std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+ ::parquet::WriterProperties::Builder builder;
+ builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+ builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+ builder.compression(::parquet::Compression::UNCOMPRESSED);
+ const auto write_row_group_size =
+ row_group_size > 0 ? row_group_size :
static_cast<int64_t>(ids.size());
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+ *table, arrow::default_memory_pool(), out, write_row_group_size,
builder.build()));
+}
+
Block build_table_block(const std::vector<TableColumn>& columns) {
Block block;
for (const auto& column : columns) {
@@ -164,6 +272,7 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
+ .column_predicates = {},
.conjuncts = VExprContext(
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
.format = FileFormat::PARQUET,
@@ -229,8 +338,9 @@ TEST(TableReaderTest,
OpenReaderBuildsTableFiltersFromConjuncts) {
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
- .conjuncts =
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
- 0, 0, 2)),
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
.io_ctx = nullptr,
@@ -262,8 +372,9 @@ TEST(TableReaderTest,
OpenReaderBuildsTableFiltersFromConjuncts) {
ASSERT_TRUE(filtered_reader
.init({
.projected_columns = projected_columns,
- .conjuncts =
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
- 0, 0, 4)),
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4)),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
.io_ctx = nullptr,
@@ -285,6 +396,195 @@ TEST(TableReaderTest,
OpenReaderBuildsTableFiltersFromConjuncts) {
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, OpenReaderBuildsColumnPredicateFilters) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_column_predicate_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ // ColumnPredicate is only used for row-group/statistics pruning. Keep one
row per row
+ // group so the predicate can prune the first two row groups and leave
only id = 3.
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one",
"two", "three"}, 1);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(2, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ TableColumnPredicates column_predicates;
+
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
Field::create_field<TYPE_INT>(2), false));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .column_predicates =
std::move(column_predicates),
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(0).column);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ASSERT_EQ(value_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, CreateScanRequestDeduplicatesSharedPredicateColumns) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const std::vector<TableColumn> projected_columns = {
+ make_table_column(0, "a", int_type),
+ make_table_column(1, "b", int_type),
+ make_table_column(2, "c", int_type),
+ make_table_column(3, "value", std::make_shared<DataTypeString>()),
+ };
+ const std::vector<SchemaField> file_schema = {
+ {.id = 0,
+ .name = "a",
+ .type = int_type,
+ .children = {},
+ .file_path = {0},
+ .field_id_path = {0},
+ .name_path = {"a"},
+ .column_type = DATA_COLUMN},
+ {.id = 1,
+ .name = "b",
+ .type = int_type,
+ .children = {},
+ .file_path = {1},
+ .field_id_path = {1},
+ .name_path = {"b"},
+ .column_type = DATA_COLUMN},
+ {.id = 2,
+ .name = "c",
+ .type = int_type,
+ .children = {},
+ .file_path = {2},
+ .field_id_path = {2},
+ .name_path = {"c"},
+ .column_type = DATA_COLUMN},
+ {.id = 3,
+ .name = "value",
+ .type = std::make_shared<DataTypeString>(),
+ .children = {},
+ .file_path = {3},
+ .field_id_path = {3},
+ .name_path = {"value"},
+ .column_type = DATA_COLUMN},
+ };
+
+ TableColumnMapper mapper;
+ ASSERT_TRUE(mapper.create_mapping(projected_columns, {},
file_schema).ok());
+
+ std::vector<TableFilter> table_filters;
+ table_filters.push_back({
+ .conjunct = VExprContext::create_shared(
+ std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1,
1)),
+ .slot_ids = {0, 1},
+ });
+ table_filters.push_back({
+ .conjunct = VExprContext::create_shared(
+ std::make_shared<TableInt32SumLessThanExpr>(0, 0, 2, 2,
3)),
+ .slot_ids = {0, 2},
+ });
+
+ FileScanRequest file_request;
+ ASSERT_TRUE(mapper.create_scan_request(table_filters, {},
projected_columns, &file_request)
+ .ok());
+
+ // Both filters reference column a. It must still be read once as a
predicate column, and a
+ // predicate column must not be repeated as a non-predicate column.
+ EXPECT_EQ(file_request.predicate_columns, std::vector<ColumnId>({0, 1,
2}));
+ EXPECT_EQ(file_request.non_predicate_columns, std::vector<ColumnId>({3}));
+ ASSERT_EQ(file_request.column_positions.size(), 4);
+ EXPECT_EQ(file_request.column_positions.at(3), 0);
+ EXPECT_EQ(file_request.column_positions.at(0), 1);
+ EXPECT_EQ(file_request.column_positions.at(1), 2);
+ EXPECT_EQ(file_request.column_positions.at(2), 3);
+ for (const auto predicate_column : file_request.predicate_columns) {
+ EXPECT_TRUE(std::find(file_request.non_predicate_columns.begin(),
+ file_request.non_predicate_columns.end(),
+ predicate_column) ==
file_request.non_predicate_columns.end());
+ }
+}
+
+TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_multi_conjunct_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one",
"two", "three"});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(2, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+ projected_columns.push_back(make_table_column(1, "score",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32SumGreaterThanExpr>(
+ 0, 0, 1, 1, 8)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // The conjunct references both id and score, so ColumnMapper must put
both file columns into
+ // predicate_columns and rewrite both slot refs to ParquetReader's
file-local block positions.
+ // ParquetReader then evaluates the expression after all predicate columns
have been read.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(0).column);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ const auto& score_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(2).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ASSERT_EQ(score_column.size(), 1);
+ ASSERT_EQ(value_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ EXPECT_EQ(score_column.get_element(0), 8);
+ EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_schema_mismatch_test";
@@ -303,6 +603,7 @@ TEST(TableReaderTest,
ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
+ .column_predicates = {},
.conjuncts = VExprContext(nullptr),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -347,6 +648,7 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
+ .column_predicates = {},
.conjuncts = VExprContext(nullptr),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -389,6 +691,7 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdP
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
+ .column_predicates = {},
.conjuncts = VExprContext(nullptr),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -438,6 +741,7 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
+ .column_predicates = {},
.conjuncts = VExprContext(nullptr),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]