This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ed7f7dead9 [Refactor](push-down predicate) Derive push-down predicate
from vconjuncts (#11468)
ed7f7dead9 is described below
commit ed7f7dead95a361422666da4a589ba1fbc807e2c
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 8 19:19:26 2022 +0800
[Refactor](push-down predicate) Derive push-down predicate from vconjuncts
(#11468)
* [Refactor](push-down predicate) Derive push-down predicate from vconjuncts
---
be/src/exec/olap_common.h | 5 +
be/src/exec/olap_scan_node.h | 1 -
be/src/exec/olap_utils.h | 22 +
be/src/exprs/bloomfilter_predicate.cpp | 5 +-
be/src/exprs/bloomfilter_predicate.h | 2 +-
be/src/exprs/runtime_filter.cpp | 18 +-
be/src/exprs/runtime_filter.h | 2 +
be/src/vec/exec/volap_scan_node.cpp | 1470 +++++++++++++----------------
be/src/vec/exec/volap_scan_node.h | 95 +-
be/src/vec/exec/volap_scanner.cpp | 4 -
be/src/vec/exec/volap_scanner.h | 5 -
be/src/vec/exprs/vbloom_predicate.cpp | 4 +-
be/src/vec/exprs/vbloom_predicate.h | 4 +-
be/src/vec/exprs/vexpr.cpp | 2 +-
be/src/vec/exprs/vexpr.h | 20 +
be/src/vec/exprs/vexpr_context.cpp | 9 +-
be/src/vec/exprs/vexpr_context.h | 9 +
be/src/vec/exprs/vin_predicate.h | 4 +
be/src/vec/exprs/vruntimefilter_wrapper.h | 2 +
be/src/vec/exprs/vslot_ref.h | 2 +
be/src/vec/functions/in.cpp | 154 +--
be/src/vec/functions/{in.cpp => in.h} | 8 +-
22 files changed, 821 insertions(+), 1026 deletions(-)
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index fb303de5f6..2f37b05633 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -256,6 +256,11 @@ public:
range.remove_fixed_value(*value);
}
+ static void add_value_range(ColumnValueRange<primitive_type>& range,
SQLFilterOp op,
+ CppType* value) {
+ range.add_range(op, *value);
+ }
+
static ColumnValueRange<primitive_type> create_empty_column_value_range() {
return
ColumnValueRange<primitive_type>::create_empty_column_value_range("");
}
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index ea2b0dc70d..4097c710fe 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -158,7 +158,6 @@ protected:
int conj_idx, int
child_idx);
friend class OlapScanner;
- friend class vectorized::VOlapScanner;
// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;
diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h
index eb0cf52e98..1fe0e21f71 100644
--- a/be/src/exec/olap_utils.h
+++ b/be/src/exec/olap_utils.h
@@ -184,6 +184,9 @@ inline int get_olap_size(PrimitiveType type) {
return 0;
}
+template <PrimitiveType>
+static constexpr bool always_false_v = false;
+
inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) {
switch (type) {
case TExprOpcode::LT:
@@ -215,4 +218,23 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type
type, bool opposite) {
return FILTER_IN;
}
+inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool
opposite) {
+ if (function_name == "lt") {
+ return opposite ? FILTER_LARGER : FILTER_LESS;
+ } else if (function_name == "gt") {
+ return opposite ? FILTER_LESS : FILTER_LARGER;
+ } else if (function_name == "le") {
+ return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL;
+ } else if (function_name == "ge") {
+ return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL;
+ } else if (function_name == "eq") {
+ return opposite ? FILTER_NOT_IN : FILTER_IN;
+ } else if (function_name == "ne") {
+ return opposite ? FILTER_IN : FILTER_NOT_IN;
+ } else {
+ DCHECK(false) << "Function Name: " << function_name;
+ return FILTER_IN;
+ }
+}
+
} // namespace doris
diff --git a/be/src/exprs/bloomfilter_predicate.cpp
b/be/src/exprs/bloomfilter_predicate.cpp
index 26cd7a209b..fcf8589ac6 100644
--- a/be/src/exprs/bloomfilter_predicate.cpp
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -43,12 +43,13 @@ BloomFilterPredicate::BloomFilterPredicate(const
BloomFilterPredicate& other)
_filtered_rows(),
_scan_rows() {}
-Status BloomFilterPredicate::prepare(RuntimeState* state,
IBloomFilterFuncBase* filter) {
+Status BloomFilterPredicate::prepare(RuntimeState* state,
+ std::shared_ptr<IBloomFilterFuncBase>
filter) {
// DCHECK(filter != nullptr);
if (_is_prepare) {
return Status::OK();
}
- _filter.reset(filter);
+ _filter = filter;
if (nullptr == _filter.get()) {
return Status::InternalError("Unknown column type.");
}
diff --git a/be/src/exprs/bloomfilter_predicate.h
b/be/src/exprs/bloomfilter_predicate.h
index 7f420a2d8e..5fcf604518 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -365,7 +365,7 @@ public:
return pool->add(new BloomFilterPredicate(*this));
}
using Predicate::prepare;
- Status prepare(RuntimeState* state, IBloomFilterFuncBase* bloomfilterfunc);
+ Status prepare(RuntimeState* state, std::shared_ptr<IBloomFilterFuncBase>
bloomfilterfunc);
std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() { return
_filter; }
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 5983b11fcc..89e832f3d0 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1000,7 +1000,7 @@ private:
int32_t _max_in_num = -1;
std::unique_ptr<MinMaxFuncBase> _minmax_func;
std::unique_ptr<HybridSetBase> _hybrid_set;
- std::unique_ptr<IBloomFilterFuncBase> _bloomfilter_func;
+ std::shared_ptr<IBloomFilterFuncBase> _bloomfilter_func;
bool _is_bloomfilter = false;
bool _is_ignored_in_filter = false;
std::string* _ignored_in_filter_msg = nullptr;
@@ -1061,6 +1061,14 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctx
return Status::OK();
}
+Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>*
push_vexprs) {
+ DCHECK(is_consumer());
+ if (!_is_ignored) {
+ return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx);
+ }
+ return Status::OK();
+}
+
Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>*
push_expr_ctxs,
ExprContext* probe_ctx) {
DCHECK(is_producer());
@@ -1649,7 +1657,7 @@ Status RuntimePredicateWrapper::get_push_context(T*
container, RuntimeState* sta
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_column_return_type));
auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
- RETURN_IF_ERROR(bloom_pred->prepare(state,
_bloomfilter_func.release()));
+ RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func));
bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
container->push_back(ctx);
@@ -1677,6 +1685,8 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
case RuntimeFilterType::IN_FILTER: {
if (!_is_ignored_in_filter) {
TTypeDesc type_desc =
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ type_desc.__set_is_nullable(
+ _hybrid_set->size() > 0 ? true :
vprob_expr->root()->is_nullable());
TExprNode node;
node.__set_type(type_desc);
node.__set_node_type(TExprNodeType::IN_PRED);
@@ -1684,6 +1694,8 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
node.__set_opcode(TExprOpcode::FILTER_IN);
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ node.__set_is_nullable(_hybrid_set->size() > 0 ? true
+ :
vprob_expr->root()->is_nullable());
// VInPredicate
doris::vectorized::VExpr* expr = nullptr;
@@ -1732,12 +1744,14 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
case RuntimeFilterType::BLOOM_FILTER: {
// create a bloom filter
TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ type_desc.__set_is_nullable(vprob_expr->root()->is_nullable());
TExprNode node;
node.__set_type(type_desc);
node.__set_node_type(TExprNodeType::BLOOM_PRED);
node.__set_opcode(TExprOpcode::RT_FILTER);
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ node.__set_is_nullable(vprob_expr->root()->is_nullable());
auto bloom_pred = _pool->add(new
doris::vectorized::VBloomPredicate(node));
bloom_pred->set_filter(_bloomfilter_func);
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b4382d8874..587c0aa87a 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -154,6 +154,8 @@ public:
// only consumer could call this
Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
+ Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
+
// This function is used by UT and producer
Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs,
ExprContext* probe_ctx);
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 4b4ddf313f..865b2d838b 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -30,12 +30,22 @@
#include "vec/core/block.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/exec/volap_scanner.h"
+#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
+#include "vec/functions/in.h"
namespace doris::vectorized {
using doris::operator<<;
+#define RETURN_IF_PUSH_DOWN(stmt) \
+ if (!push_down) { \
+ stmt; \
+ } else { \
+ return; \
+ }
+
VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
_tuple_id(tnode.olap_scan_node.tuple_id),
@@ -59,7 +69,6 @@ VOlapScanNode::VOlapScanNode(ObjectPool* pool, const
TPlanNode& tnode, const Des
Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
- _direct_conjunct_size = _conjunct_ctxs.size();
const TQueryOptions& query_options = state->query_options();
if (query_options.__isset.max_scan_key_num) {
@@ -244,6 +253,7 @@ Status VOlapScanNode::open(RuntimeState* state) {
// acquire runtime filter
_runtime_filter_ctxs.resize(_runtime_filter_descs.size());
+ std::vector<VExpr*> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
auto& filter_desc = _runtime_filter_descs[i];
IRuntimeFilter* runtime_filter = nullptr;
@@ -257,21 +267,12 @@ Status VOlapScanNode::open(RuntimeState* state) {
ready = runtime_filter->await();
}
if (ready) {
- std::list<ExprContext*> expr_context;
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
+ RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
_runtime_filter_ctxs[i].apply_mark = true;
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
-
- for (auto ctx : expr_context) {
- ctx->prepare(state, row_desc());
- ctx->open(state);
- int index = _conjunct_ctxs.size();
- _conjunct_ctxs.push_back(ctx);
- // it's safe to store address from a fix-resized vector
- _conjunctid_to_runtime_filter_ctxs[index] =
&_runtime_filter_ctxs[i];
- }
}
}
+ RETURN_IF_ERROR(_append_rf_into_conjuncts(state, vexprs));
return Status::OK();
}
@@ -469,7 +470,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
if (_vconjunct_ctx_ptr) {
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
}
- _vconjunct_ctx_ptr.reset(new
doris::vectorized::VExprContext*);
+ _vconjunct_ctx_ptr.reset(new VExprContext*);
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
_runtime_filter_ready_flag[i] = true;
}
@@ -621,136 +622,41 @@ Status VOlapScanNode::_add_blocks(std::vector<Block*>&
block) {
return Status::OK();
}
-void VOlapScanNode::eval_const_conjuncts() {
- for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
- // if conjunct is constant, compute direct and set eos = true
- if (_conjunct_ctxs[conj_idx]->root()->is_constant()) {
- void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr);
- if (value == nullptr || *reinterpret_cast<bool*>(value) == false) {
- _eos = true;
- break;
- }
- }
- }
-}
-
Status VOlapScanNode::normalize_conjuncts() {
std::vector<SlotDescriptor*> slots = _tuple_desc->slots();
for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
switch (slots[slot_idx]->type().type) {
- case TYPE_TINYINT: {
- ColumnValueRange<TYPE_TINYINT> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_SMALLINT: {
- ColumnValueRange<TYPE_SMALLINT> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_INT: {
- ColumnValueRange<TYPE_INT> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_BIGINT: {
- ColumnValueRange<TYPE_BIGINT> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_LARGEINT: {
- ColumnValueRange<TYPE_LARGEINT> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_CHAR: {
- ColumnValueRange<TYPE_CHAR> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
- case TYPE_VARCHAR: {
- ColumnValueRange<TYPE_VARCHAR> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
- case TYPE_HLL: {
- ColumnValueRange<TYPE_HLL> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
- case TYPE_STRING: {
- ColumnValueRange<TYPE_STRING> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DATE: {
- ColumnValueRange<TYPE_DATE> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
- case TYPE_DATETIME: {
- ColumnValueRange<TYPE_DATETIME> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DATEV2: {
- ColumnValueRange<TYPE_DATEV2> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DATETIMEV2: {
- ColumnValueRange<TYPE_DATETIMEV2>
range(slots[slot_idx]->col_name(),
-
slots[slot_idx]->type().precision,
-
slots[slot_idx]->type().scale);
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DECIMALV2: {
- ColumnValueRange<TYPE_DECIMALV2>
range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DECIMAL32: {
- ColumnValueRange<TYPE_DECIMAL32> range(slots[slot_idx]->col_name(),
-
slots[slot_idx]->type().precision,
-
slots[slot_idx]->type().scale);
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DECIMAL64: {
- ColumnValueRange<TYPE_DECIMAL64> range(slots[slot_idx]->col_name(),
-
slots[slot_idx]->type().precision,
-
slots[slot_idx]->type().scale);
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_DECIMAL128: {
- ColumnValueRange<TYPE_DECIMAL128>
range(slots[slot_idx]->col_name(),
-
slots[slot_idx]->type().precision,
-
slots[slot_idx]->type().scale);
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
- case TYPE_BOOLEAN: {
- ColumnValueRange<TYPE_BOOLEAN> range(slots[slot_idx]->col_name());
- normalize_predicate(range, slots[slot_idx]);
- break;
- }
-
+#define M(NAME)
\
+ case TYPE_##NAME: {
\
+ ColumnValueRange<TYPE_##NAME> range(slots[slot_idx]->col_name(),
\
+ slots[slot_idx]->type().precision,
\
+ slots[slot_idx]->type().scale);
\
+ _id_to_slot_column_value_range[slots[slot_idx]->id()] =
\
+ std::pair {slots[slot_idx], range};
\
+ break;
\
+ }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+ M(TINYINT) \
+ M(SMALLINT) \
+ M(INT) \
+ M(BIGINT) \
+ M(LARGEINT) \
+ M(CHAR) \
+ M(DATE) \
+ M(DATETIME) \
+ M(DATEV2) \
+ M(DATETIMEV2) \
+ M(VARCHAR) \
+ M(STRING) \
+ M(HLL) \
+ M(DECIMAL32) \
+ M(DECIMAL64) \
+ M(DECIMAL128) \
+ M(DECIMALV2) \
+ M(BOOLEAN)
+ APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
default: {
VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" <<
slots[slot_idx]->col_name()
<< "]";
@@ -758,6 +664,28 @@ Status VOlapScanNode::normalize_conjuncts() {
}
}
}
+ if (_vconjunct_ctx_ptr) {
+ if ((*_vconjunct_ctx_ptr)->root()) {
+ VExpr* new_root = _normalize_predicate(_runtime_state,
(*_vconjunct_ctx_ptr)->root());
+ if (new_root) {
+ (*_vconjunct_ctx_ptr)->set_root(new_root);
+ } else {
+ (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+ _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+ _vconjunct_ctx_ptr.reset(nullptr);
+ }
+ }
+ }
+ for (auto& it : _id_to_slot_column_value_range) {
+ std::visit(
+ [&](auto&& range) {
+ if (range.is_empty_value_range()) {
+ _eos = true;
+ }
+ },
+ it.second.second);
+ _column_value_ranges[it.second.first->col_name()] = it.second.second;
+ }
return Status::OK();
}
@@ -786,67 +714,6 @@ static std::string olap_filters_to_string(const
std::vector<doris::TCondition>&
return filters_string;
}
-Status VOlapScanNode::build_function_filters() {
- for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
- ExprContext* ex_ctx = _conjunct_ctxs[conj_idx];
- Expr* fn_expr = ex_ctx->root();
- bool opposite = false;
-
- if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() &&
- TExprOpcode::COMPOUND_NOT == fn_expr->op()) {
- fn_expr = fn_expr->get_child(0);
- opposite = true;
- }
-
- // currently only support like / not like
- if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() &&
- "like" == fn_expr->fn().name.function_name) {
- doris_udf::FunctionContext* func_cxt =
- ex_ctx->fn_context(fn_expr->get_fn_context_index());
-
- if (!func_cxt) {
- continue;
- }
- if (fn_expr->children().size() != 2) {
- continue;
- }
- SlotRef* slot_ref = nullptr;
- Expr* literal_expr = nullptr;
-
- if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type())
{
- literal_expr = fn_expr->get_child(1);
- slot_ref = (SlotRef*)(fn_expr->get_child(0));
- } else if (TExprNodeType::SLOT_REF ==
fn_expr->get_child(1)->node_type()) {
- literal_expr = fn_expr->get_child(0);
- slot_ref = (SlotRef*)(fn_expr->get_child(1));
- } else {
- continue;
- }
-
- if (TExprNodeType::STRING_LITERAL != literal_expr->node_type())
continue;
-
- const SlotDescriptor* slot_desc = nullptr;
- std::vector<SlotId> slot_ids;
- slot_ref->get_slot_ids(&slot_ids);
- for (SlotDescriptor* slot : _tuple_desc->slots()) {
- if (slot->id() == slot_ids[0]) {
- slot_desc = slot;
- break;
- }
- }
-
- if (!slot_desc) {
- continue;
- }
- std::string col = slot_desc->col_name();
- StringVal val = literal_expr->get_string_val(ex_ctx, nullptr);
- _push_down_functions.emplace_back(opposite, col, func_cxt, val);
- _pushed_func_conjuncts_index.insert(conj_idx);
- }
- }
- return Status::OK();
-}
-
Status VOlapScanNode::build_key_ranges_and_filters() {
const std::vector<std::string>& column_names =
_olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types =
_olap_scan_node.key_column_type;
@@ -898,62 +765,23 @@ Status VOlapScanNode::build_key_ranges_and_filters() {
Status VOlapScanNode::start_scan(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
- VLOG_CRITICAL << "Eval Const Conjuncts";
- // 1. Eval const conjuncts to find whether eos = true
- eval_const_conjuncts();
-
VLOG_CRITICAL << "NormalizeConjuncts";
- // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts
may
- // set eos = true
RETURN_IF_ERROR(normalize_conjuncts());
- // 1 and 2 step dispose find conjuncts set eos = true, return directly
if (_eos) {
return Status::OK();
}
VLOG_CRITICAL << "BuildKeyRangesAndFilters";
- // 3.1 Using `Key Column`'s ColumnValueRange to split ScanRange to several
`Sub ScanRange`
RETURN_IF_ERROR(build_key_ranges_and_filters());
- // 3.2 Function pushdown
- if (config::enable_function_pushdown)
RETURN_IF_ERROR(build_function_filters());
-
- VLOG_CRITICAL << "Filter idle conjuncts";
- // 4.1 Filter idle conjunct which already trans to olap filters
- // this must be after build_scan_key, it will free the StringValue memory
- remove_pushed_conjuncts(state);
VLOG_CRITICAL << "StartScanThread";
- // 5. Start multi thread to read several `Sub Sub ScanRange`
RETURN_IF_ERROR(start_scan_thread(state));
return Status::OK();
}
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_predicate(ColumnValueRange<T>& range,
SlotDescriptor* slot) {
- // 1. Normalize InPredicate, add to ColumnValueRange
- RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range));
-
- // 2. Normalize NotInPredicate, add to ColumnValueRange
- RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range));
-
- // 3. Normalize BinaryPredicate , add to ColumnValueRange
- RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));
-
- // 3. Normalize BloomFilterPredicate, push down by hash join node
- RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot));
-
- // 4. Check whether range is empty, set _eos
- if (range.is_empty_value_range()) _eos = true;
-
- // 5. Add range to Column->ColumnValueRange map
- _column_value_ranges[slot->col_name()] = range;
-
- return Status::OK();
-}
-
-static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
+static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
if ((slot->type().is_date_type() || slot->type().is_date_v2_type() ||
slot->type().is_datetime_v2_type()) &&
(expr->type().is_date_type() || expr->type().is_date_v2_type() ||
@@ -966,152 +794,83 @@ static bool ignore_cast(SlotDescriptor* slot, Expr*
expr) {
return false;
}
-bool VOlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
- doris::InPredicate* pred) {
- if (Expr::type_without_cast(pred->get_child(0)) !=
TExprNodeType::SLOT_REF) {
- // not a slot ref(column)
- return false;
- }
-
- std::vector<SlotId> slot_ids;
- if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) {
- // not a single column predicate
- return false;
- }
-
- if (slot_ids[0] != slot->id()) {
- // predicate not related to current column
- return false;
- }
-
- if (pred->get_child(0)->type().type != slot->type().type) {
- if (!ignore_cast(slot, pred->get_child(0))) {
- // the type of predicate not match the slot's type
- return false;
- }
- }
-
- VLOG_CRITICAL << slot->col_name() << " fixed_values add num: " <<
pred->hybrid_set()->size();
-
- // if there are too many elements in InPredicate, exceed the limit,
- // we will not push any condition of this column to storage engine.
- // because too many conditions pushed down to storage engine may even
- // slow down the query process.
- // ATTN: This is just an experience value. You may need to try
- // different thresholds to improve performance.
- if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) {
- VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() <<
" exceed limit "
- << _max_pushdown_conditions_per_column;
- return false;
- }
-
- return true;
-}
-
-std::pair<bool, void*>
VOlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot,
-
doris::Expr* pred, int conj_idx,
- int
child_idx) {
- auto result_pair = std::make_pair<bool, void*>(false, nullptr);
-
- // Do not get slot_ref of column, should not push_down to Storage Engine
- if (Expr::type_without_cast(pred->get_child(child_idx)) !=
TExprNodeType::SLOT_REF) {
- return result_pair;
- }
-
- std::vector<SlotId> slot_ids;
- if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) {
- // not a single column predicate
- return result_pair;
- }
-
- if (slot_ids[0] != slot->id()) {
- // predicate not related to current column
- return result_pair;
- }
-
- if (pred->get_child(child_idx)->type().type != slot->type().type) {
- if (!ignore_cast(slot, pred->get_child(child_idx))) {
- // the type of predicate not match the slot's type
- return result_pair;
+template <bool IsFixed, PrimitiveType PrimitiveType, typename
ChangeFixedValueRangeFunc>
+Status VOlapScanNode::change_value_range(ColumnValueRange<PrimitiveType>&
temp_range, void* value,
+ const ChangeFixedValueRangeFunc& func,
+ const std::string& fn_name, int
slot_ref_child) {
+ if constexpr (PrimitiveType == TYPE_DATE) {
+ DateTimeValue date_value;
+
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+ if constexpr (IsFixed) {
+ if (!date_value.check_loss_accuracy_cast_to_date()) {
+ func(temp_range,
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+ &date_value));
+ }
+ } else {
+ if (date_value.check_loss_accuracy_cast_to_date()) {
+ if (fn_name == "lt" || fn_name == "ge") {
+ ++date_value;
+ }
+ }
+ func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+ &date_value));
}
- }
-
- Expr* expr = pred->get_child(1 - child_idx);
- if (!expr->is_constant()) {
- // only handle constant value
- return result_pair;
- }
-
- // get value in result pair
- result_pair = std::make_pair(
- true, _conjunct_ctxs[conj_idx]->get_value(expr, nullptr,
slot->type().precision,
- slot->type().scale));
-
- return result_pair;
-}
-
-template <PrimitiveType primitive_type, typename ChangeFixedValueRangeFunc>
-Status
VOlapScanNode::change_fixed_value_range(ColumnValueRange<primitive_type>&
temp_range,
- void* value, const
ChangeFixedValueRangeFunc& func) {
- switch (primitive_type) {
- case TYPE_DATE: {
- DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
- // There is must return empty data in olap_scan_node,
- // Because data value loss accuracy
- if (!date_value.check_loss_accuracy_cast_to_date()) {
+ } else if constexpr (PrimitiveType == TYPE_DATETIME) {
+ DateTimeValue date_value;
+
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+ if constexpr (IsFixed) {
func(temp_range,
- reinterpret_cast<typename
PrimitiveTypeTraits<primitive_type>::CppType*>(
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
&date_value));
+ } else {
+ func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+ reinterpret_cast<char*>(&date_value)));
}
- break;
- }
- case TYPE_DECIMALV2:
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_HLL:
- case TYPE_DATETIME:
- case TYPE_DATETIMEV2:
- case TYPE_TINYINT:
- case TYPE_SMALLINT:
- case TYPE_INT:
- case TYPE_BIGINT:
- case TYPE_LARGEINT:
- case TYPE_DECIMAL32:
- case TYPE_DECIMAL64:
- case TYPE_DECIMAL128:
- case TYPE_STRING: {
- func(temp_range,
- reinterpret_cast<typename
PrimitiveTypeTraits<primitive_type>::CppType*>(value));
- break;
- }
- case TYPE_BOOLEAN: {
- bool v = *reinterpret_cast<bool*>(value);
- func(temp_range,
- reinterpret_cast<typename
PrimitiveTypeTraits<primitive_type>::CppType*>(&v));
- break;
- }
- case TYPE_DATEV2: {
+ } else if constexpr (PrimitiveType == TYPE_DATEV2) {
DateV2Value<DateTimeV2ValueType> datetimev2_value =
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
- if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
- DateV2Value<DateV2ValueType> date_v2;
+ if constexpr (IsFixed) {
+ if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
+ DateV2Value<DateV2ValueType> date_v2;
+ date_v2.set_date_uint32(
+ binary_cast<DateV2Value<DateTimeV2ValueType>,
uint64_t>(datetimev2_value) >>
+ TIME_PART_LENGTH);
+ func(temp_range, &date_v2);
+ }
+ } else {
+ doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
date_v2.set_date_uint32(
binary_cast<DateV2Value<DateTimeV2ValueType>,
uint64_t>(datetimev2_value) >>
TIME_PART_LENGTH);
- if constexpr (primitive_type == PrimitiveType::TYPE_DATEV2) {
- func(temp_range, &date_v2);
- } else {
- __builtin_unreachable();
+ if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
+ if (fn_name == "lt" || fn_name == "ge") {
+ ++date_v2;
+ }
}
+ func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
&date_v2);
+ }
+ } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType
== TYPE_CHAR) ||
+ (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType ==
TYPE_HLL) ||
+ (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType
== TYPE_TINYINT) ||
+ (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType ==
TYPE_INT) ||
+ (PrimitiveType == TYPE_BIGINT) || (PrimitiveType ==
TYPE_LARGEINT) ||
+ (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType
== TYPE_DECIMAL64) ||
+ (PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType
== TYPE_STRING) ||
+ (PrimitiveType == TYPE_BOOLEAN)) {
+ if constexpr (IsFixed) {
+ func(temp_range,
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
+ } else {
+ func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+ reinterpret_cast<typename
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
}
- break;
- }
- default: {
- LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type.
[type="
- << primitive_type << "]";
- return Status::InternalError("Normalize filter fail, Unsupported
Primitive type");
- }
+ } else {
+ static_assert(always_false_v<PrimitiveType>);
}
+
return Status::OK();
}
@@ -1127,440 +886,6 @@ bool VOlapScanNode::is_key_column(const std::string&
key_name) {
return res != _olap_scan_node.key_column_name.end();
}
-void VOlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
- if (_pushed_conjuncts_index.empty() &&
_pushed_func_conjuncts_index.empty()) {
- return;
- }
-
- // dispose direct conjunct first
- std::vector<ExprContext*> new_conjunct_ctxs;
- for (int i = 0; i < _direct_conjunct_size; ++i) {
- if (!_pushed_conjuncts_index.empty() &&
_pushed_conjuncts_index.count(i)) {
- _conjunct_ctxs[i]->close(state); // pushed condition, just close
- } else if (!_pushed_func_conjuncts_index.empty() &&
_pushed_func_conjuncts_index.count(i)) {
- _pushed_func_conjunct_ctxs.emplace_back(
- _conjunct_ctxs[i]); // pushed functions, need keep ctxs
- } else {
- new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
- }
- }
- auto new_direct_conjunct_size = new_conjunct_ctxs.size();
-
- // dispose hash join push down conjunct second
- for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
- if (!_pushed_conjuncts_index.empty() &&
_pushed_conjuncts_index.count(i)) {
- _conjunct_ctxs[i]->close(state); // pushed condition, just close
- } else if (!_pushed_func_conjuncts_index.empty() &&
_pushed_func_conjuncts_index.count(i)) {
- _pushed_func_conjunct_ctxs.emplace_back(
- _conjunct_ctxs[i]); // pushed functions, need keep ctxs
- } else {
- new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
- }
- }
-
- _conjunct_ctxs = std::move(new_conjunct_ctxs);
- _direct_conjunct_size = new_direct_conjunct_size;
-
- // TODO: support vbloom_filter_predicate/vbinary_predicate and merge
unpushed predicate to _vconjunct_ctx
- for (auto push_down_ctx : _pushed_conjuncts_index) {
- auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx);
- if (iter != _conjunctid_to_runtime_filter_ctxs.end()) {
- iter->second->runtimefilter->set_push_down_profile();
- }
- }
-
- // set vconjunct_ctx is empty, if all conjunct
- if (_direct_conjunct_size == 0) {
- if (_vconjunct_ctx_ptr != nullptr) {
- (*_vconjunct_ctx_ptr)->close(state);
- _vconjunct_ctx_ptr = nullptr;
- }
- }
-
- // filter idle conjunct in vexpr_contexts
- auto checker = [&](int index) { return
_pushed_conjuncts_index.count(index); };
- _peel_pushed_vconjunct(state, checker);
-}
-
-// Construct the ColumnValueRange for one specified column
-// It will only handle the InPredicate and eq BinaryPredicate in conjunct_ctxs.
-// It will try to push down conditions of that column as much as possible,
-// But if the number of conditions exceeds the limit, none of conditions will
be pushed down.
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
- ColumnValueRange<T>*
range) {
- std::vector<uint32_t> filter_conjuncts_index;
- for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
- // create empty range as temp range, temp range should do intersection
on range
- auto temp_range = ColumnValueRange<T>::create_empty_column_value_range(
- slot->type().precision, slot->type().scale);
-
- // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
- if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
- InPredicate* pred =
static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
- if (!should_push_down_in_predicate(slot, pred)) {
- continue;
- }
-
- // begin to push InPredicate value into ColumnValueRange
- HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
- while (iter->has_next()) {
- // column in (nullptr) is always false so continue to
- // dispose next item
- if (nullptr == iter->get_value()) {
- iter->next();
- continue;
- }
- auto value = const_cast<void*>(iter->get_value());
- RETURN_IF_ERROR(change_fixed_value_range(
- temp_range, value,
ColumnValueRange<T>::add_fixed_value_range));
- iter->next();
- }
-
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
- range->intersection(temp_range);
- } // end of handle in predicate
- // 2. Normalize eq conjuncts like 'where col = value'
- else if (TExprNodeType::BINARY_PRED ==
_conjunct_ctxs[conj_idx]->root()->node_type() &&
- FILTER_IN ==
to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
- Expr* pred = _conjunct_ctxs[conj_idx]->root();
- DCHECK(pred->get_num_children() == 2);
-
- for (int child_idx = 0; child_idx < 2; ++child_idx) {
- // TODO: should use C++17 structured bindlings to refactor
this code in the future:
- // 'auto [should_push_down, value] =
should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
- // make code tidier and readabler
- auto result_pair = should_push_down_eq_predicate(slot, pred,
conj_idx, child_idx);
- if (!result_pair.first) {
- continue;
- }
-
- auto value = result_pair.second;
- // where A = nullptr should return empty result set
- if (value != nullptr) {
- RETURN_IF_ERROR(change_fixed_value_range(
- temp_range, value,
ColumnValueRange<T>::add_fixed_value_range));
- }
-
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
- range->intersection(temp_range);
- } // end for each binary predicate child
- } // end of handling eq binary predicate
- }
-
- // exceed limit, no conditions will be pushed down to storage engine.
- if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) {
- range->set_whole_value_range();
- } else {
- std::copy(filter_conjuncts_index.cbegin(),
filter_conjuncts_index.cend(),
- std::inserter(_pushed_conjuncts_index,
_pushed_conjuncts_index.begin()));
- }
- return Status::OK();
-}
-
-// Construct the ColumnValueRange for one specified column
-// It will only handle the NotInPredicate and not eq BinaryPredicate in
conjunct_ctxs.
-// It will try to push down conditions of that column as much as possible,
-// But if the number of conditions exceeds the limit, none of conditions will
be pushed down.
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor*
slot,
-
ColumnValueRange<T>* range) {
- // If the conjunct of slot is fixed value, will change the fixed value set
of column value range
- // else add value to not in range and push down predicate directly
- bool is_fixed_range = range->is_fixed_value_range();
- auto not_in_range =
ColumnValueRange<T>::create_empty_column_value_range(range->column_name());
-
- std::vector<uint32_t> filter_conjuncts_index;
- for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
- // 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)'
- if (TExprOpcode::FILTER_NOT_IN ==
_conjunct_ctxs[conj_idx]->root()->op()) {
- InPredicate* pred =
static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
- if (!should_push_down_in_predicate(slot, pred)) {
- continue;
- }
-
- // begin to push InPredicate value into ColumnValueRange
- auto iter = pred->hybrid_set()->begin();
- while (iter->has_next()) {
- // column not in (nullptr) is always true
- if (nullptr == iter->get_value()) {
- continue;
- }
- auto value = const_cast<void*>(iter->get_value());
- if (is_fixed_range) {
- RETURN_IF_ERROR(change_fixed_value_range(
- *range, value,
ColumnValueRange<T>::remove_fixed_value_range));
- } else {
- RETURN_IF_ERROR(change_fixed_value_range(
- not_in_range, value,
ColumnValueRange<T>::add_fixed_value_range));
- }
- iter->next();
- }
-
- // only where a in ('a', 'b', nullptr) contain nullptr will
- // clear temp_range to whole range, no need do intersection
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
- } // end of handle not in predicate
-
- // 2. Normalize eq conjuncts like 'where col != value'
- if (TExprNodeType::BINARY_PRED ==
_conjunct_ctxs[conj_idx]->root()->node_type() &&
- FILTER_NOT_IN ==
to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) {
- Expr* pred = _conjunct_ctxs[conj_idx]->root();
- DCHECK(pred->get_num_children() == 2);
-
- for (int child_idx = 0; child_idx < 2; ++child_idx) {
- // TODO: should use C++17 structured bindlings to refactor
this code in the future:
- // 'auto [should_push_down, value] =
should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);'
- // make code tidier and readabler
- auto result_pair = should_push_down_eq_predicate(slot, pred,
conj_idx, child_idx);
- if (!result_pair.first) {
- continue;
- }
- auto value = result_pair.second;
-
- if (is_fixed_range) {
- RETURN_IF_ERROR(change_fixed_value_range(
- *range, value,
ColumnValueRange<T>::remove_fixed_value_range));
- } else {
- RETURN_IF_ERROR(change_fixed_value_range(
- not_in_range, value,
ColumnValueRange<T>::add_fixed_value_range));
- }
-
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
- } // end for each binary predicate child
- } // end of handling eq binary predicate
- }
-
- // exceed limit, no conditions will be pushed down to storage engine.
- if (is_fixed_range ||
- not_in_range.get_fixed_value_size() <=
_max_pushdown_conditions_per_column) {
- if (!is_fixed_range) {
- // push down not in condition to storage engine
- not_in_range.to_in_condition(_olap_filter, false);
- }
- std::copy(filter_conjuncts_index.cbegin(),
filter_conjuncts_index.cend(),
- std::inserter(_pushed_conjuncts_index,
_pushed_conjuncts_index.begin()));
- }
- return Status::OK();
-}
-
-template <PrimitiveType T>
-bool VOlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor*
slot,
- const std::string& is_null_str,
- ColumnValueRange<T>* range) {
- if (expr->node_type() != TExprNodeType::SLOT_REF) {
- return false;
- }
-
- std::vector<SlotId> slot_ids;
- if (1 != expr->get_slot_ids(&slot_ids)) {
- return false;
- }
-
- if (slot_ids[0] != slot->id()) {
- return false;
- }
-
- auto temp_range =
ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
-
slot->type().scale);
- temp_range.set_contain_null(is_null_str == "null");
- range->intersection(temp_range);
-
- return true;
-}
-
-template <PrimitiveType T>
-Status VOlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
- ColumnValueRange<T>*
range) {
- std::vector<uint32_t> filter_conjuncts_index;
-
- for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
- Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
- if (TExprNodeType::BINARY_PRED != root_expr->node_type() ||
- FILTER_IN == to_olap_filter_type(root_expr->op(), false) ||
- FILTER_NOT_IN == to_olap_filter_type(root_expr->op(), false)) {
- if (TExprNodeType::FUNCTION_CALL == root_expr->node_type()) {
- std::string is_null_str;
- // 1. dispose the where pred "A is null" and "A is not null"
- if (root_expr->is_null_scalar_function(is_null_str) &&
- normalize_is_null_predicate(root_expr->get_child(0), slot,
is_null_str,
- range)) {
- // if column is key column should push down conjunct
storage engine
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
- }
- }
- continue;
- }
-
- // 2. dispose the where pred "A <,<=" and "A >,>="
- Expr* pred = _conjunct_ctxs[conj_idx]->root();
- DCHECK(pred->get_num_children() == 2);
-
- for (int child_idx = 0; child_idx < 2; ++child_idx) {
- if (Expr::type_without_cast(pred->get_child(child_idx)) !=
TExprNodeType::SLOT_REF) {
- continue;
- }
- if (pred->get_child(child_idx)->type().type != slot->type().type) {
- if (!ignore_cast(slot, pred->get_child(child_idx))) {
- continue;
- }
- }
-
- std::vector<SlotId> slot_ids;
-
- if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) {
- if (slot_ids[0] != slot->id()) {
- continue;
- }
-
- Expr* expr = pred->get_child(1 - child_idx);
-
- // for case: where col_a > col_b
- if (!expr->is_constant()) {
- continue;
- }
-
- void* value = _conjunct_ctxs[conj_idx]->get_value(
- expr, nullptr, slot->type().precision,
slot->type().scale);
- // for case: where col > null
- if (value == nullptr) {
- continue;
- }
-
- switch (slot->type().type) {
- case TYPE_DATE: {
- DateTimeValue date_value =
*reinterpret_cast<DateTimeValue*>(value);
- // NOTE: Datetime may be truncated to a date column, so we
call ++operator for date_value
- // for example: '2010-01-01 00:00:01' will be truncate to
'2010-01-01'
- if (date_value.check_loss_accuracy_cast_to_date()) {
- if (pred->op() == TExprOpcode::LT || pred->op() ==
TExprOpcode::GE) {
- ++date_value;
- }
- }
- range->add_range(to_olap_filter_type(pred->op(),
child_idx),
- *reinterpret_cast<typename
PrimitiveTypeTraits<T>::CppType*>(
- &date_value));
- break;
- }
- case TYPE_DATEV2: {
- DateV2Value<DateTimeV2ValueType> datetimev2_value =
-
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
- doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
-
date_v2.set_date_uint32(binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(
- datetimev2_value) >>
- TIME_PART_LENGTH);
- if
(!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
- if (pred->op() == TExprOpcode::LT || pred->op() ==
TExprOpcode::GE) {
- ++date_v2;
- }
- }
- if constexpr (T == PrimitiveType::TYPE_DATEV2) {
- range->add_range(to_olap_filter_type(pred->op(),
child_idx), date_v2);
- break;
- } else {
- __builtin_unreachable();
- }
- }
- case TYPE_TINYINT:
- case TYPE_DECIMALV2:
- case TYPE_DECIMAL32:
- case TYPE_DECIMAL64:
- case TYPE_DECIMAL128:
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_HLL:
- case TYPE_DATETIME:
- case TYPE_DATETIMEV2:
- case TYPE_SMALLINT:
- case TYPE_INT:
- case TYPE_BIGINT:
- case TYPE_LARGEINT:
- case TYPE_BOOLEAN:
- case TYPE_STRING: {
- range->add_range(
- to_olap_filter_type(pred->op(), child_idx),
- *reinterpret_cast<typename
PrimitiveTypeTraits<T>::CppType*>(value));
- break;
- }
-
- default: {
- LOG(WARNING) << "Normalize filter fail, Unsupported
Primitive type. [type="
- << expr->type() << "]";
- return Status::InternalError(
- "Normalize filter fail, Unsupported Primitive
type");
- }
- }
-
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- }
-
- VLOG_CRITICAL << slot->col_name() << " op: "
- <<
static_cast<int>(to_olap_filter_type(pred->op(), child_idx))
- << " value: "
- << *reinterpret_cast<typename
PrimitiveTypeTraits<T>::CppType*>(
- value);
- }
- }
- }
-
- std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
- std::inserter(_pushed_conjuncts_index,
_pushed_conjuncts_index.begin()));
-
- return Status::OK();
-}
-
-Status VOlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
- std::vector<uint32_t> filter_conjuncts_index;
-
- for (int conj_idx = _direct_conjunct_size; conj_idx <
_conjunct_ctxs.size(); ++conj_idx) {
- Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
- if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue;
-
- Expr* pred = _conjunct_ctxs[conj_idx]->root();
- DCHECK(pred->get_num_children() == 1);
-
- if (Expr::type_without_cast(pred->get_child(0)) !=
TExprNodeType::SLOT_REF) {
- continue;
- }
- if (pred->get_child(0)->type().type != slot->type().type) {
- if (!ignore_cast(slot, pred->get_child(0))) {
- continue;
- }
- }
-
- std::vector<SlotId> slot_ids;
-
- if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) {
- if (slot_ids[0] != slot->id()) {
- continue;
- }
- // only key column of bloom filter will push down to storage engine
- if (is_key_column(slot->col_name())) {
- filter_conjuncts_index.emplace_back(conj_idx);
- _bloom_filters_push_down.emplace_back(
- slot->col_name(),
-
(reinterpret_cast<BloomFilterPredicate*>(pred))->get_bloom_filter_func());
- }
- }
- }
-
- std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
- std::inserter(_pushed_conjuncts_index,
_pushed_conjuncts_index.begin()));
-
- return Status::OK();
-}
-
Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
if (_scan_ranges.empty()) {
_transfer_done = true;
@@ -1700,8 +1025,6 @@ Status VOlapScanNode::close(RuntimeState* state) {
DCHECK(runtime_filter != nullptr);
runtime_filter->consumer_close();
}
- // pushed functions close
- Expr::close(_pushed_func_conjunct_ctxs, state);
for (auto& ctx : _stale_vexpr_ctxs) {
(*ctx)->close(state);
@@ -2019,4 +1342,533 @@ Status VOlapScanNode::get_hints(TabletSharedPtr table,
const TPaloScanRange& sca
return Status::OK();
}
+template <bool IsNotIn>
+bool VOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred,
VExprContext* expr_ctx) {
+ if (pred->is_not_in() != IsNotIn) {
+ return false;
+ }
+ InState* state = reinterpret_cast<InState*>(
+ expr_ctx->fn_context(pred->fn_context_index())
+ ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ HybridSetBase* set = state->hybrid_set.get();
+
+ // if there are too many elements in InPredicate, exceed the limit,
+ // we will not push any condition of this column to storage engine.
+ // because too many conditions pushed down to storage engine may even
+ // slow down the query process.
+ // ATTN: This is just an experience value. You may need to try
+ // different thresholds to improve performance.
+ if (set->size() > _max_pushdown_conditions_per_column) {
+ VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit
"
+ << _max_pushdown_conditions_per_column;
+ return false;
+ }
+ return true;
+}
+
+bool VOlapScanNode::_should_push_down_function_filter(VectorizedFnCall*
fn_call,
+ VExprContext* expr_ctx,
+ std::string*
constant_str,
+
doris_udf::FunctionContext** fn_ctx) {
+ // Now only `like` function filters is supported to push down
+ if (fn_call->fn().name.function_name != "like") {
+ return false;
+ }
+
+ const auto& children = fn_call->children();
+ doris_udf::FunctionContext* func_cxt =
expr_ctx->fn_context(fn_call->fn_context_index());
+ DCHECK(func_cxt != nullptr);
+ DCHECK(children.size() == 2);
+ for (size_t i = 0; i < children.size(); i++) {
+ if (VExpr::expr_without_cast(children[i])->node_type() !=
TExprNodeType::SLOT_REF) {
+ // not a slot ref(column)
+ continue;
+ }
+ if (!children[1 - i]->is_constant()) {
+ // only handle constant value
+ return false;
+ } else {
+ DCHECK(children[1 - i]->type().is_string_type());
+ if (const ColumnConst* const_column =
check_and_get_column<ColumnConst>(
+ children[1 - i]->get_const_col(expr_ctx)->column_ptr))
{
+ *constant_str = const_column->get_data_at(0).to_string();
+ } else {
+ return false;
+ }
+ }
+ }
+ *fn_ctx = func_cxt;
+ return true;
+}
+
+bool VOlapScanNode::_should_push_down_binary_predicate(
+ VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef*
constant_val,
+ int* slot_ref_child, const std::function<bool(const std::string&)>&
fn_checker) {
+ if (!fn_checker(fn_call->fn().name.function_name)) {
+ return false;
+ }
+
+ const auto& children = fn_call->children();
+ DCHECK(children.size() == 2);
+ for (size_t i = 0; i < children.size(); i++) {
+ if (VExpr::expr_without_cast(children[i])->node_type() !=
TExprNodeType::SLOT_REF) {
+ // not a slot ref(column)
+ continue;
+ }
+ if (!children[1 - i]->is_constant()) {
+ // only handle constant value
+ return false;
+ } else {
+ if (const ColumnConst* const_column =
check_and_get_column<ColumnConst>(
+ children[1 - i]->get_const_col(expr_ctx)->column_ptr))
{
+ *slot_ref_child = i;
+ *constant_val = const_column->get_data_at(0);
+ } else {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+bool VOlapScanNode::_is_predicate_acting_on_slot(
+ VExpr* expr,
+ const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**,
VExpr**)>& checker,
+ SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
+ const VSlotRef* slot_ref = nullptr;
+ VExpr* child_contains_slot = nullptr;
+ if (!checker(expr->children(), &slot_ref, &child_contains_slot)) {
+ // not a slot ref(column)
+ return false;
+ }
+
+ auto entry = _id_to_slot_column_value_range.find(slot_ref->slot_id());
+ if (_id_to_slot_column_value_range.end() == entry) {
+ return false;
+ }
+ *slot_desc = entry->second.first;
+ DCHECK(child_contains_slot != nullptr);
+ if (child_contains_slot->type().type != (*slot_desc)->type().type) {
+ if (!ignore_cast(*slot_desc, child_contains_slot)) {
+ // the type of predicate not match the slot's type
+ return false;
+ }
+ }
+ *range = &(entry->second.second);
+ return true;
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_in_and_eq_predicate(VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor* slot,
+ ColumnValueRange<T>&
range, bool* push_down) {
+ auto temp_range =
ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
+
slot->type().scale);
+ bool effect = false;
+ // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+ if (TExprNodeType::IN_PRED == expr->node_type()) {
+ VInPredicate* pred = static_cast<VInPredicate*>(expr);
+ if (!_should_push_down_in_predicate<false>(pred, expr_ctx)) {
+ return Status::OK();
+ }
+
+ // begin to push InPredicate value into ColumnValueRange
+ InState* state = reinterpret_cast<InState*>(
+ expr_ctx->fn_context(pred->fn_context_index())
+ ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+ auto fn_name = std::string("");
+ while (iter->has_next()) {
+ // column in (nullptr) is always false so continue to
+ // dispose next item
+ if (nullptr == iter->get_value()) {
+ iter->next();
+ continue;
+ }
+ auto value = const_cast<void*>(iter->get_value());
+ RETURN_IF_ERROR(change_value_range<true>(
+ temp_range, value,
ColumnValueRange<T>::add_fixed_value_range, fn_name));
+ iter->next();
+ }
+
+ range.intersection(temp_range);
+ effect = true;
+ } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+ DCHECK(expr->children().size() == 2);
+ auto eq_checker = [](const std::string& fn_name) { return fn_name ==
"eq"; };
+
+ StringRef value;
+ int slot_ref_child = -1;
+ if
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr),
expr_ctx,
+ &value, &slot_ref_child,
eq_checker)) {
+ DCHECK(slot_ref_child >= 0);
+ // where A = nullptr should return empty result set
+ auto fn_name = std::string("");
+ if (value.data != nullptr) {
+ if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T ==
TYPE_STRING ||
+ T == TYPE_HLL) {
+ auto val = StringValue(value.data, value.size);
+ RETURN_IF_ERROR(change_value_range<true>(
+ temp_range, reinterpret_cast<void*>(&val),
+ ColumnValueRange<T>::add_fixed_value_range,
fn_name));
+ } else {
+ RETURN_IF_ERROR(change_value_range<true>(
+ temp_range,
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+ ColumnValueRange<T>::add_fixed_value_range,
fn_name));
+ }
+ range.intersection(temp_range);
+ effect = true;
+ }
+ }
+ }
+
+ // exceed limit, no conditions will be pushed down to storage engine.
+ if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) {
+ range.set_whole_value_range();
+ } else {
+ *push_down = effect;
+ }
+ return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor*
slot,
+
ColumnValueRange<T>& range,
+ bool* push_down) {
+ bool is_fixed_range = range.is_fixed_value_range();
+ auto not_in_range =
ColumnValueRange<T>::create_empty_column_value_range(range.column_name());
+ bool effect = false;
+ // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+ if (TExprNodeType::IN_PRED == expr->node_type()) {
+ VInPredicate* pred = static_cast<VInPredicate*>(expr);
+ if (!_should_push_down_in_predicate<true>(pred, expr_ctx)) {
+ return Status::OK();
+ }
+
+ // begin to push InPredicate value into ColumnValueRange
+ InState* state = reinterpret_cast<InState*>(
+ expr_ctx->fn_context(pred->fn_context_index())
+ ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+ auto fn_name = std::string("");
+ while (iter->has_next()) {
+ // column not in (nullptr) is always true
+ if (nullptr == iter->get_value()) {
+ continue;
+ }
+ auto value = const_cast<void*>(iter->get_value());
+ if (is_fixed_range) {
+ RETURN_IF_ERROR(change_value_range<true>(
+ range, value,
ColumnValueRange<T>::remove_fixed_value_range, fn_name));
+ } else {
+ RETURN_IF_ERROR(change_value_range<true>(
+ not_in_range, value,
ColumnValueRange<T>::add_fixed_value_range, fn_name));
+ }
+ iter->next();
+ }
+ effect = true;
+ } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+ DCHECK(expr->children().size() == 2);
+
+ auto ne_checker = [](const std::string& fn_name) { return fn_name ==
"ne"; };
+ StringRef value;
+ int slot_ref_child = -1;
+ if
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr),
expr_ctx,
+ &value, &slot_ref_child,
ne_checker)) {
+ DCHECK(slot_ref_child >= 0);
+ // where A = nullptr should return empty result set
+ if (value.data != nullptr) {
+ auto fn_name = std::string("");
+ if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T ==
TYPE_STRING ||
+ T == TYPE_HLL) {
+ auto val = StringValue(value.data, value.size);
+ if (is_fixed_range) {
+ RETURN_IF_ERROR(change_value_range<true>(
+ range, reinterpret_cast<void*>(&val),
+ ColumnValueRange<T>::remove_fixed_value_range,
fn_name));
+ } else {
+ RETURN_IF_ERROR(change_value_range<true>(
+ not_in_range, reinterpret_cast<void*>(&val),
+ ColumnValueRange<T>::add_fixed_value_range,
fn_name));
+ }
+ } else {
+ if (is_fixed_range) {
+ RETURN_IF_ERROR(change_value_range<true>(
+ range,
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+ ColumnValueRange<T>::remove_fixed_value_range,
fn_name));
+ } else {
+ RETURN_IF_ERROR(change_value_range<true>(
+ not_in_range,
+
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+ ColumnValueRange<T>::add_fixed_value_range,
fn_name));
+ }
+ }
+ effect = true;
+ }
+ }
+ }
+
+ if (is_fixed_range ||
+ not_in_range.get_fixed_value_size() <=
_max_pushdown_conditions_per_column) {
+ if (!is_fixed_range) {
+ // push down not in condition to storage engine
+ not_in_range.to_in_condition(_olap_filter, false);
+ }
+ *push_down = effect;
+ }
+ return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ bool* push_down) {
+ if (TExprNodeType::FUNCTION_CALL == expr->node_type()) {
+ if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name
== "is_null_pred") {
+ auto temp_range =
ColumnValueRange<T>::create_empty_column_value_range(
+ slot->type().precision, slot->type().scale);
+ temp_range.set_contain_null(true);
+ range.intersection(temp_range);
+ *push_down = true;
+ } else if
(reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name ==
+ "is_not_null_pred") {
+ auto temp_range =
ColumnValueRange<T>::create_empty_column_value_range(
+ slot->type().precision, slot->type().scale);
+ temp_range.set_contain_null(false);
+ range.intersection(temp_range);
+ *push_down = true;
+ }
+ }
+ return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VOlapScanNode::_normalize_noneq_binary_predicate(VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor* slot,
+ ColumnValueRange<T>&
range,
+ bool* push_down) {
+ if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+ DCHECK(expr->children().size() == 2);
+
+ auto noneq_checker = [](const std::string& fn_name) {
+ return fn_name != "ne" && fn_name != "eq";
+ };
+ StringRef value;
+ int slot_ref_child = -1;
+ if
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr),
expr_ctx,
+ &value, &slot_ref_child,
noneq_checker)) {
+ DCHECK(slot_ref_child >= 0);
+ const std::string& fn_name =
+
reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name;
+
+ // where A = nullptr should return empty result set
+ if (value.data != nullptr) {
+ *push_down = true;
+ if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T ==
TYPE_STRING ||
+ T == TYPE_HLL) {
+ auto val = StringValue(value.data, value.size);
+ RETURN_IF_ERROR(change_value_range<false>(range,
reinterpret_cast<void*>(&val),
+
ColumnValueRange<T>::add_value_range,
+ fn_name,
slot_ref_child));
+ } else {
+ RETURN_IF_ERROR(change_value_range<false>(
+ range,
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+ ColumnValueRange<T>::add_value_range, fn_name,
slot_ref_child));
+ }
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status VOlapScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot, bool*
push_down) {
+ if (TExprNodeType::BLOOM_PRED == expr->node_type()) {
+ DCHECK(expr->children().size() == 1);
+ _bloom_filters_push_down.emplace_back(slot->col_name(),
expr->get_bloom_filter_func());
+ *push_down = true;
+ }
+ return Status::OK();
+}
+
+Status VOlapScanNode::_normalize_function_filters(VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot, bool*
push_down) {
+ if (!config::enable_function_pushdown) {
+ return Status::OK();
+ }
+ bool opposite = false;
+ VExpr* fn_expr = expr;
+ if (TExprNodeType::COMPOUND_PRED == expr->node_type() &&
+ expr->fn().name.function_name == "not") {
+ fn_expr = fn_expr->children()[0];
+ opposite = true;
+ }
+
+ if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) {
+ doris_udf::FunctionContext* fn_ctx = nullptr;
+ std::string str;
+ if
(_should_push_down_function_filter(reinterpret_cast<VectorizedFnCall*>(fn_expr),
+ expr_ctx, &str, &fn_ctx)) {
+ std::string col = slot->col_name();
+ StringVal val(reinterpret_cast<uint8_t*>(str.data()), str.size());
+ _push_down_functions.emplace_back(opposite, col, fn_ctx, val);
+ *push_down = true;
+ }
+ }
+ return Status::OK();
+}
+
+void VOlapScanNode::eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx,
bool* push_down) {
+ char* constant_val = nullptr;
+ if (vexpr->is_constant()) {
+ if (const ColumnConst* const_column =
+
check_and_get_column<ColumnConst>(vexpr->get_const_col(expr_ctx)->column_ptr)) {
+ constant_val =
const_cast<char*>(const_column->get_data_at(0).data);
+ if (constant_val == nullptr ||
*reinterpret_cast<bool*>(constant_val) == false) {
+ *push_down = true;
+ _eos = true;
+ }
+ } else {
+ LOG(WARNING) << "Expr[" << vexpr->debug_string()
+ << "] is a constant but doesn't contain a const
column!";
+ }
+ }
+}
+
+VExpr* VOlapScanNode::_normalize_predicate(RuntimeState* state, VExpr*
conjunct_expr_root) {
+ static constexpr auto is_leaf = [](VExpr* expr) { return
!expr->is_and_expr(); };
+ auto in_predicate_checker = [](const std::vector<VExpr*>& children, const
VSlotRef** slot,
+ VExpr** child_contains_slot) {
+ if (children.empty() ||
+ VExpr::expr_without_cast(children[0])->node_type() !=
TExprNodeType::SLOT_REF) {
+ // not a slot ref(column)
+ return false;
+ }
+ *slot = reinterpret_cast<const
VSlotRef*>(VExpr::expr_without_cast(children[0]));
+ *child_contains_slot = children[0];
+ return true;
+ };
+ auto eq_predicate_checker = [](const std::vector<VExpr*>& children, const
VSlotRef** slot,
+ VExpr** child_contains_slot) {
+ for (const VExpr* child : children) {
+ if (VExpr::expr_without_cast(child)->node_type() !=
TExprNodeType::SLOT_REF) {
+ // not a slot ref(column)
+ continue;
+ }
+ *slot = reinterpret_cast<const
VSlotRef*>(VExpr::expr_without_cast(child));
+ *child_contains_slot = const_cast<VExpr*>(child);
+ return true;
+ }
+ return false;
+ };
+
+ if (conjunct_expr_root != nullptr) {
+ if (is_leaf(conjunct_expr_root)) {
+ auto impl = conjunct_expr_root->get_impl();
+ VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) :
conjunct_expr_root;
+ SlotDescriptor* slot;
+ ColumnValueRangeType* range = nullptr;
+ bool push_down = false;
+ eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()),
&push_down);
+ if (!push_down &&
+ (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker,
&slot, &range) ||
+ _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker,
&slot, &range))) {
+ std::visit(
+ [&](auto& value_range) {
+ RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, value_range,
+ &push_down));
+
RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, value_range,
+ &push_down));
+ RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, value_range,
+ &push_down));
+
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, value_range,
+ &push_down));
+ if (is_key_column(slot->col_name())) {
+ RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, &push_down));
+
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
+ cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, &push_down));
+ }
+ },
+ *range);
+ }
+ if (push_down && is_key_column(slot->col_name())) {
+ return nullptr;
+ } else {
+ return conjunct_expr_root;
+ }
+ } else {
+ VExpr* left_child = _normalize_predicate(state,
conjunct_expr_root->children()[0]);
+ VExpr* right_child = _normalize_predicate(state,
conjunct_expr_root->children()[1]);
+
+ if (left_child != nullptr && right_child != nullptr) {
+ conjunct_expr_root->set_children({left_child, right_child});
+ return conjunct_expr_root;
+ } else {
+ // here only close the and expr self, do not close the child
+ conjunct_expr_root->set_children({});
+ conjunct_expr_root->close(state, *_vconjunct_ctx_ptr,
+
(*_vconjunct_ctx_ptr)->get_function_state_scope());
+ }
+
+ // here do not close Expr* now
+ return left_child != nullptr ? left_child : right_child;
+ }
+ }
+ return conjunct_expr_root;
+}
+
+Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state,
std::vector<VExpr*>& vexprs) {
+ if (!vexprs.empty()) {
+ auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() :
vexprs[0];
+ for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+ if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
+ continue;
+ }
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("and");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+ texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_is_nullable(last_expr->is_nullable() ||
vexprs[j]->is_nullable());
+ VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+ new_node->add_child(last_expr);
+ DCHECK((vexprs[j])->get_impl() != nullptr);
+ new_node->add_child(vexprs[j]);
+ last_expr = new_node;
+ _rf_vexpr_set.insert(vexprs[j]);
+ }
+ auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+ if (_vconjunct_ctx_ptr) {
+ (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
+ }
+ RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc()));
+ RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state));
+ if (_vconjunct_ctx_ptr) {
+ (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+ _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+ }
+ _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+ *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+ }
+ return Status::OK();
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scan_node.h
b/be/src/vec/exec/volap_scan_node.h
index 4ead17f9b3..e047f34c10 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -26,6 +26,9 @@
#include "gen_cpp/PlanNodes_types.h"
#include "olap/tablet.h"
#include "util/progress_updater.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/exprs/vslot_ref.h"
namespace doris {
class ObjectPool;
@@ -63,40 +66,15 @@ private:
// In order to ensure the accuracy of the query result
// only key column conjuncts will be remove as idle conjunct
bool is_key_column(const std::string& key_name);
- void remove_pushed_conjuncts(RuntimeState* state);
Status start_scan(RuntimeState* state);
- void eval_const_conjuncts();
Status normalize_conjuncts();
Status build_key_ranges_and_filters();
- Status build_function_filters();
- template <PrimitiveType T>
- Status normalize_predicate(ColumnValueRange<T>& range, SlotDescriptor*
slot);
-
- template <PrimitiveType T>
- Status normalize_in_and_eq_predicate(SlotDescriptor* slot,
ColumnValueRange<T>* range);
-
- template <PrimitiveType T>
- Status normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
ColumnValueRange<T>* range);
-
- template <PrimitiveType T>
- Status normalize_noneq_binary_predicate(SlotDescriptor* slot,
ColumnValueRange<T>* range);
-
- Status normalize_bloom_filter_predicate(SlotDescriptor* slot);
-
- template <PrimitiveType T>
- static bool normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
- const std::string& is_null_str,
- ColumnValueRange<T>* range);
- bool should_push_down_in_predicate(SlotDescriptor* slot, InPredicate*
in_pred);
-
- template <PrimitiveType T, typename ChangeFixedValueRangeFunc>
- static Status change_fixed_value_range(ColumnValueRange<T>& range, void*
value,
- const ChangeFixedValueRangeFunc&
func);
-
- std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot,
Expr* pred,
- int conj_idx, int
child_idx);
+ template <bool IsFixed, PrimitiveType PrimitiveType, typename
ChangeFixedValueRangeFunc>
+ static Status change_value_range(ColumnValueRange<PrimitiveType>& range,
void* value,
+ const ChangeFixedValueRangeFunc& func,
+ const std::string& fn_name, int
slot_ref_child = -1);
void transfer_thread(RuntimeState* state);
void scanner_thread(VOlapScanner* scanner);
@@ -114,6 +92,54 @@ private:
return _runtime_filter_descs;
}
+ template <PrimitiveType T>
+ Status _normalize_in_and_eq_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ bool* push_down);
+ template <PrimitiveType T>
+ Status _normalize_not_in_and_not_eq_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ bool* push_down);
+
+ template <PrimitiveType T>
+ Status _normalize_noneq_binary_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ bool* push_down);
+
+ template <PrimitiveType T>
+ Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ bool* push_down);
+
+ Status _normalize_bloom_filter(vectorized::VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot, bool* push_down);
+
+ void eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool*
push_down);
+
+ vectorized::VExpr* _normalize_predicate(RuntimeState* state,
+ vectorized::VExpr*
conjunct_expr_root);
+
+ Status _normalize_function_filters(vectorized::VExpr* expr, VExprContext*
expr_ctx,
+ SlotDescriptor* slot, bool* push_down);
+
+ template <bool IsNotIn>
+ bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext*
expr_ctx);
+
+ bool _is_predicate_acting_on_slot(VExpr* expr,
+ const std::function<bool(const
std::vector<VExpr*>&,
+ const
VSlotRef**, VExpr**)>& checker,
+ SlotDescriptor** slot_desc,
ColumnValueRangeType** range);
+
+ bool _should_push_down_binary_predicate(
+ VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef*
constant_val,
+ int* slot_ref_child, const std::function<bool(const
std::string&)>& fn_checker);
+
+ bool _should_push_down_function_filter(VectorizedFnCall* fn_call,
VExprContext* expr_ctx,
+ std::string* constant_str,
+ doris_udf::FunctionContext**
fn_ctx);
+
+ Status _append_rf_into_conjuncts(RuntimeState* state, std::vector<VExpr*>&
vexprs);
+
// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;
// doris scan node used to scan doris
@@ -124,9 +150,6 @@ private:
int _tuple_idx;
// string slots
std::vector<SlotDescriptor*> _string_slots;
- // conjunct's index which already be push down storage engine
- // should be remove in olap_scan_node, no need check this conjunct again
- std::set<uint32_t> _pushed_conjuncts_index;
// collection slots
std::vector<SlotDescriptor*> _collection_slots;
@@ -149,11 +172,6 @@ private:
// push down functions to storage engine
// only support scalar functions, now just support like / not like
std::vector<FunctionFilter> _push_down_functions;
- // functions conjunct's index which already be push down storage engine
- std::set<uint32_t> _pushed_func_conjuncts_index;
- // need keep these conjunct to the end of scan node,
- // since some memory referenced by pushed function filters
- std::vector<ExprContext*> _pushed_func_conjunct_ctxs;
// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
@@ -183,7 +201,6 @@ private:
// Used in Scan thread to ensure thread-safe
std::atomic_bool _scanner_done;
std::atomic_bool _transfer_done;
- size_t _direct_conjunct_size;
int _total_assign_num;
int _nice;
@@ -321,6 +338,8 @@ private:
phmap::flat_hash_set<VExpr*> _rf_vexpr_set;
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
+ phmap::flat_hash_map<int, std::pair<SlotDescriptor*, ColumnValueRangeType>>
+ _id_to_slot_column_value_range;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index c032aee301..6ce3182c1c 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -122,10 +122,6 @@ Status VOlapScanner::open() {
SCOPED_TIMER(_parent->_reader_init_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
- if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) {
- _use_pushdown_conjuncts = true;
- }
-
_runtime_filter_marks.resize(_parent->runtime_filter_descs().size(),
false);
auto res = _tablet_reader->init(_tablet_reader_params);
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 1ea8639c6c..7584eb47c9 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -83,10 +83,6 @@ public:
int64_t update_wait_worker_timer() const { return _watcher.elapsed_time();
}
- void set_use_pushdown_conjuncts(bool has_pushdown_conjuncts) {
- _use_pushdown_conjuncts = has_pushdown_conjuncts;
- }
-
std::vector<bool>* mutable_runtime_filter_marks() { return
&_runtime_filter_marks; }
TabletStorageType get_storage_type();
@@ -117,7 +113,6 @@ private:
bool _aggregation;
bool _need_agg_finalize = true;
bool _has_update_counter = false;
- bool _use_pushdown_conjuncts = false;
TabletReader::ReaderParams _tablet_reader_params;
std::unique_ptr<TabletReader> _tablet_reader;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index 6f73b2c69d..c708efdda0 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -90,7 +90,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block*
block, int* result
const std::string& VBloomPredicate::expr_name() const {
return _expr_name;
}
-void VBloomPredicate::set_filter(std::unique_ptr<IBloomFilterFuncBase>&
filter) {
- _filter.reset(filter.release());
+void VBloomPredicate::set_filter(std::shared_ptr<IBloomFilterFuncBase>&
filter) {
+ _filter = filter;
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h
b/be/src/vec/exprs/vbloom_predicate.h
index f715b9e40e..0f00d6ca55 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -37,7 +37,9 @@ public:
return pool->add(new VBloomPredicate(*this));
}
const std::string& expr_name() const override;
- void set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter);
+ void set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter);
+
+ std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const
override { return _filter; }
private:
std::shared_ptr<IBloomFilterFuncBase> _filter;
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 909feb324f..d493a87a65 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -354,7 +354,7 @@ Status VExpr::init_function_context(VExprContext* context,
void VExpr::close_function_context(VExprContext* context,
FunctionContext::FunctionStateScope scope,
const FunctionBasePtr& function) const {
- if (_fn_context_index != -1) {
+ if (_fn_context_index != -1 && !context->_stale) {
FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
function->close(fn_ctx, FunctionContext::THREAD_LOCAL);
if (scope == FunctionContext::FRAGMENT_LOCAL) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b6921753e8..9d2d4ce64b 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -21,6 +21,7 @@
#include <vector>
#include "common/status.h"
+#include "exprs/bloomfilter_predicate.h"
#include "gen_cpp/Exprs_types.h"
#include "runtime/types.h"
#include "udf/udf_internal.h"
@@ -137,6 +138,25 @@ public:
/// expr.
virtual ColumnPtrWrapper* get_const_col(VExprContext* context);
+ int fn_context_index() { return _fn_context_index; };
+
+ static const VExpr* expr_without_cast(const VExpr* expr) {
+ if (expr->node_type() == doris::TExprNodeType::CAST_EXPR) {
+ return expr_without_cast(expr->_children[0]);
+ }
+ return expr;
+ }
+
+ // If this expr is a RuntimeFilterWrapper, this method will return an
underlying rf expression
+ virtual const VExpr* get_impl() const { return nullptr; }
+
+ // If this expr is a BloomPredicate, this method will return a
BloomFilterFunc
+ virtual std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func()
const {
+ LOG(FATAL) << "Method 'get_bloom_filter_func()' is not supported in
expression: "
+ << this->debug_string();
+ return nullptr;
+ }
+
protected:
/// Simple debug string that provides no expr subclass-specific information
std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index dfb4223741..da4a3a96eb 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -28,7 +28,8 @@ VExprContext::VExprContext(VExpr* expr)
_prepared(false),
_opened(false),
_closed(false),
- _last_result_column_id(-1) {}
+ _last_result_column_id(-1),
+ _stale(false) {}
VExprContext::~VExprContext() {
DCHECK(!_prepared || _closed);
@@ -98,6 +99,12 @@ doris::Status VExprContext::clone(RuntimeState* state,
VExprContext** new_ctx) {
return _root->open(state, *new_ctx, FunctionContext::THREAD_LOCAL);
}
+void VExprContext::clone_fn_contexts(VExprContext* other) {
+ for (auto& _fn_context : _fn_contexts) {
+
other->_fn_contexts.push_back(_fn_context->impl()->clone(other->_pool.get()));
+ }
+}
+
int VExprContext::register_func(RuntimeState* state, const
FunctionContext::TypeDesc& return_type,
const std::vector<FunctionContext::TypeDesc>&
arg_types,
int varargs_buffer_size) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index fb31925136..b454833ed3 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -69,6 +69,13 @@ public:
return _is_clone ? FunctionContext::THREAD_LOCAL :
FunctionContext::FRAGMENT_LOCAL;
}
+ void clone_fn_contexts(VExprContext* other);
+
+ void mark_as_stale() {
+ DCHECK(!_stale);
+ _stale = true;
+ }
+
private:
friend class VExpr;
@@ -91,5 +98,7 @@ private:
std::unique_ptr<MemPool> _pool;
int _last_result_column_id;
+
+ bool _stale;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 64cc9acefa..6b2264d40a 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -41,6 +41,10 @@ public:
virtual std::string debug_string() const override;
+ const FunctionBasePtr function() { return _function; };
+
+ const bool is_not_in() { return _is_not_in; };
+
private:
FunctionBasePtr _function;
std::string _expr_name;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 755d243fd8..91a6bdbcac 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -44,6 +44,8 @@ public:
return _impl->get_const_col(context);
}
+ const VExpr* get_impl() const override { return _impl; }
+
private:
VExpr* _impl;
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 1bc78a4c5c..0f8540fc9e 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -41,6 +41,8 @@ public:
const int column_id() const { return _column_id; }
+ const int slot_id() const { return _slot_id; }
+
private:
FunctionPtr _function;
int _slot_id;
diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.cpp
index 4e5877a6ec..b0bc1d137f 100644
--- a/be/src/vec/functions/in.cpp
+++ b/be/src/vec/functions/in.cpp
@@ -16,164 +16,12 @@
// under the License.
// This file is copied from
-#include <fmt/format.h>
+#include <vec/functions/in.h>
-#include "exprs/create_predicate_function.h"
-#include "vec/columns/column_const.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_set.h"
-#include "vec/columns/columns_number.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/functions/function.h"
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
-struct InState {
- bool use_set = true;
-
- // only use in null in set
- bool null_in_set = false;
- std::unique_ptr<HybridSetBase> hybrid_set;
-};
-
-template <bool negative>
-class FunctionIn : public IFunction {
-public:
- static constexpr auto name = negative ? "not_in" : "in";
-
- static FunctionPtr create() { return std::make_shared<FunctionIn>(); }
-
- String get_name() const override { return name; }
-
- bool is_variadic() const override { return true; }
-
- size_t get_number_of_arguments() const override { return 0; }
-
- DataTypePtr get_return_type_impl(const DataTypes& args) const override {
- for (const auto& arg : args) {
- if (arg->is_nullable()) return
make_nullable(std::make_shared<DataTypeUInt8>());
- }
- return std::make_shared<DataTypeUInt8>();
- }
-
- bool use_default_implementation_for_nulls() const override { return false;
}
-
- Status prepare(FunctionContext* context,
FunctionContext::FunctionStateScope scope) override {
- if (scope == FunctionContext::THREAD_LOCAL) {
- return Status::OK();
- }
- auto* state = new InState();
- context->set_function_state(scope, state);
- state->hybrid_set.reset(
-
vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
-
- DCHECK(context->get_num_args() >= 1);
- for (int i = 1; i < context->get_num_args(); ++i) {
- const auto& const_column_ptr = context->get_constant_col(i);
- if (const_column_ptr != nullptr) {
- auto const_data = const_column_ptr->column_ptr->get_data_at(0);
- if (const_data.data == nullptr) {
- state->null_in_set = true;
- } else {
- state->hybrid_set->insert((void*)const_data.data,
const_data.size);
- }
- } else {
- state->use_set = false;
- break;
- }
- }
- return Status::OK();
- }
-
- Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
- size_t result, size_t input_rows_count) override {
- auto in_state = reinterpret_cast<InState*>(
- context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
- if (!in_state) {
- return Status::RuntimeError("funciton context for function '{}'
must have Set;",
- get_name());
- }
- auto res = ColumnUInt8::create();
- ColumnUInt8::Container& vec_res = res->get_data();
- vec_res.resize(input_rows_count);
-
- ColumnUInt8::MutablePtr col_null_map_to;
- col_null_map_to = ColumnUInt8::create(input_rows_count);
- auto& vec_null_map_to = col_null_map_to->get_data();
-
- /// First argument may be a single column.
- const ColumnWithTypeAndName& left_arg =
block.get_by_position(arguments[0]);
- auto materialized_column =
left_arg.column->convert_to_full_column_if_const();
-
- if (in_state->use_set) {
- for (size_t i = 0; i < input_rows_count; ++i) {
- const auto& ref_data = materialized_column->get_data_at(i);
- if (ref_data.data) {
- vec_res[i] = negative ^
-
in_state->hybrid_set->find((void*)ref_data.data, ref_data.size);
- if (in_state->null_in_set) {
- vec_null_map_to[i] = negative == vec_res[i];
- } else {
- vec_null_map_to[i] = false;
- }
- } else {
- vec_null_map_to[i] = true;
- }
- }
- } else {
- std::vector<ColumnPtr> set_columns;
- for (int i = 1; i < arguments.size(); ++i) {
-
set_columns.emplace_back(block.get_by_position(arguments[i]).column);
- }
-
- for (size_t i = 0; i < input_rows_count; ++i) {
- const auto& ref_data = materialized_column->get_data_at(i);
- if (ref_data.data == nullptr) {
- vec_null_map_to[i] = true;
- continue;
- }
-
- std::unique_ptr<HybridSetBase> hybrid_set(
-
vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type)));
- bool null_in_set = false;
-
- for (const auto& set_column : set_columns) {
- auto set_data = set_column->get_data_at(i);
- if (set_data.data == nullptr)
- null_in_set = true;
- else
- hybrid_set->insert((void*)(set_data.data),
set_data.size);
- }
- vec_res[i] = negative ^ hybrid_set->find((void*)ref_data.data,
ref_data.size);
- if (null_in_set) {
- vec_null_map_to[i] = negative == vec_res[i];
- } else {
- vec_null_map_to[i] = false;
- }
- }
- }
-
- if (block.get_by_position(result).type->is_nullable()) {
- block.replace_by_position(
- result, ColumnNullable::create(std::move(res),
std::move(col_null_map_to)));
- } else {
- block.replace_by_position(result, std::move(res));
- }
-
- return Status::OK();
- }
-
- Status close(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
- if (scope == FunctionContext::FRAGMENT_LOCAL) {
- delete reinterpret_cast<InState*>(
-
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
- }
- return Status::OK();
- }
-};
-
void register_function_in(SimpleFunctionFactory& factory) {
factory.register_function<FunctionIn<false>>();
factory.register_function<FunctionIn<true>>();
diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.h
similarity index 96%
copy from be/src/vec/functions/in.cpp
copy to be/src/vec/functions/in.h
index 4e5877a6ec..d10661fa48 100644
--- a/be/src/vec/functions/in.cpp
+++ b/be/src/vec/functions/in.h
@@ -16,6 +16,8 @@
// under the License.
// This file is copied from
+#pragma once
+
#include <fmt/format.h>
#include "exprs/create_predicate_function.h"
@@ -26,7 +28,6 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/functions/function.h"
-#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
@@ -174,9 +175,4 @@ public:
}
};
-void register_function_in(SimpleFunctionFactory& factory) {
- factory.register_function<FunctionIn<false>>();
- factory.register_function<FunctionIn<true>>();
-}
-
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]