This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0181b5e38 [fix](be) Restrict varbinary predicate block to file scan
(#64019)
2c0181b5e38 is described below
commit 2c0181b5e38d30080cd35eda73f6d241ef079870
Author: Pxl <[email protected]>
AuthorDate: Wed Jun 3 10:50:17 2026 +0800
[fix](be) Restrict varbinary predicate block to file scan (#64019)
The scan operator unconditionally skipped VARBINARY
column predicate and TopN runtime predicate pushdown. The commit that
introduced the guard was for external Parquet/file scan reader predicate
limitations, so applying it in the shared scan path also blocked
non-file scans. This change adds a scan-operator hook for column
predicate pushdown capability, keeps the default permissive, and makes
FileScanOperatorX reject VARBINARY column predicates.
---
be/src/exec/operator/file_scan_operator.cpp | 18 +++++++++++++++++-
be/src/exec/operator/file_scan_operator.h | 11 ++++-------
be/src/exec/operator/scan_operator.cpp | 9 ++++-----
be/src/exec/operator/scan_operator.h | 8 ++++++--
4 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/be/src/exec/operator/file_scan_operator.cpp
b/be/src/exec/operator/file_scan_operator.cpp
index abb74523827..2a87f413a15 100644
--- a/be/src/exec/operator/file_scan_operator.cpp
+++ b/be/src/exec/operator/file_scan_operator.cpp
@@ -53,6 +53,17 @@ PushDownType
FileScanLocalState::_should_push_down_binary_predicate(
}
}
+bool FileScanLocalState::_push_down_topn(const RuntimePredicate& predicate) {
+ if (!predicate.target_is_slot(_parent->node_id())) {
+ return false;
+ }
+ auto& p = _parent->cast<FileScanOperatorX>();
+ const auto slot_id =
predicate.get_texpr(_parent->node_id()).nodes[0].slot_ref.slot_id;
+ auto* slot = p._slot_id_to_slot_desc[slot_id];
+ DCHECK(slot != nullptr);
+ return p.can_push_down_column_predicate(slot);
+}
+
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
@@ -107,7 +118,7 @@ Status
FileScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
std::min(ScannerScheduler::default_remote_scan_thread_num() /
p.parallelism(state()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
- _kv_cache.reset(new ShardedKVCache(shard_num));
+ _kv_cache = std::make_unique<ShardedKVCache>(shard_num);
for (int i = 0; i < _max_scanners; ++i) {
std::unique_ptr<FileScanner> scanner = FileScanner::create_unique(
state(), this, p._limit, _split_source,
_scanner_profile.get(), _kv_cache.get(),
@@ -206,4 +217,9 @@ Status FileScanOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
+bool FileScanOperatorX::can_push_down_column_predicate(const SlotDescriptor*
slot) const {
+ // External readers do not fully support VARBINARY column predicates yet.
+ return slot->type()->get_primitive_type() != TYPE_VARBINARY;
+}
+
} // namespace doris
diff --git a/be/src/exec/operator/file_scan_operator.h
b/be/src/exec/operator/file_scan_operator.h
index 39b97b11665..c4cc085c778 100644
--- a/be/src/exec/operator/file_scan_operator.h
+++ b/be/src/exec/operator/file_scan_operator.h
@@ -17,8 +17,7 @@
#pragma once
-#include <stdint.h>
-
+#include <cstdint>
#include <string>
#include "common/logging.h"
@@ -63,11 +62,7 @@ private:
PushDownType _should_push_down_topn_filter() const override {
return PushDownType::PARTIAL_ACCEPTABLE;
}
- bool _push_down_topn(const RuntimePredicate& predicate) override {
- // For external table/ file scan, first try push down the predicate,
- // and then determine whether it can be pushed down within the
(parquet/orc) reader.
- return true;
- }
+ bool _push_down_topn(const RuntimePredicate& predicate) override;
PushDownType _should_push_down_bitmap_filter() const override {
return PushDownType::UNACCEPTABLE;
@@ -123,6 +118,8 @@ public:
return column_id_counter;
}
+ bool can_push_down_column_predicate(const SlotDescriptor* slot) const
override;
+
private:
friend class FileScanLocalState;
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 110e2a44097..7904413f77b 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -652,7 +652,7 @@ bool
ScanLocalState<Derived>::_is_predicate_acting_on_slot(const VExprSPtrs& chi
if (_slot_id_to_value_range.end() == sid_to_range) {
return false;
}
- if (remove_nullable((*slot_desc)->type())->get_primitive_type() ==
TYPE_VARBINARY) {
+ if (!_parent->cast<typename
Derived::Parent>().can_push_down_column_predicate(*slot_desc)) {
return false;
}
*range = &(sid_to_range->second);
@@ -1307,11 +1307,10 @@ Status
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
.nodes[0]
.slot_ref.slot_id];
DCHECK(s != nullptr);
- if (remove_nullable(s->type())->get_primitive_type() ==
TYPE_VARBINARY) {
- continue;
+ if (can_push_down_column_predicate(s)) {
+ auto col_name = s->col_name();
+ cid = get_column_id(col_name);
}
- auto col_name = s->col_name();
- cid = get_column_id(col_name);
}
RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target(
node_id(), _slot_id_to_slot_desc, cid));
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index fa28b7c6505..f3d65df35ad 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -238,9 +238,9 @@ class ScanLocalState : public ScanLocalStateBase {
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;
- virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
- virtual Status open(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const final;
@@ -388,6 +388,10 @@ public:
[[nodiscard]] virtual int get_column_id(const std::string& col_name) const
{ return -1; }
+ [[nodiscard]] virtual bool can_push_down_column_predicate(const
SlotDescriptor*) const {
+ return true;
+ }
+
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]