This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d545eb3865c [fix](inverted index) fixed in_list condition not indexed
on pipelinex (#36565)
d545eb3865c is described below
commit d545eb3865c77d16304c7a0d56003f487d33a5f3
Author: zzzxl <[email protected]>
AuthorDate: Fri Jun 21 10:14:00 2024 +0800
[fix](inverted index) fixed in_list condition not indexed on pipelinex
(#36565)
---
be/src/exec/olap_utils.h | 4 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 9 +++
be/src/pipeline/exec/scan_operator.cpp | 93 +++++++++++++++-------
be/src/pipeline/exec/scan_operator.h | 23 ++++--
.../test_index_inlist_fault_injection.out | 19 +++++
.../test_index_inlist_fault_injection.groovy | 93 ++++++++++++++++++++++
6 files changed, 203 insertions(+), 38 deletions(-)
diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h
index d1a1be81f5d..ddf8562fea1 100644
--- a/be/src/exec/olap_utils.h
+++ b/be/src/exec/olap_utils.h
@@ -117,9 +117,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string&
function_name, bool op
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else if (function_name == "ne") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
- } else if (function_name == "in_list") {
+ } else if (function_name == "in") {
return opposite ? FILTER_NOT_IN : FILTER_IN;
- } else if (function_name == "not_in_list") {
+ } else if (function_name == "not_in") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else {
DCHECK(false) << "Function Name: " << function_name;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 37df15d6939..f0c3f8f4920 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2403,6 +2403,15 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
return Status::EndOfFile("no more data in segment");
}
+ DBUG_EXECUTE_IF("segment_iterator._rowid_result_for_index", {
+ for (auto& iter : _rowid_result_for_index) {
+ if (iter.second.first) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "_rowid_result_for_index exists true");
+ }
+ }
+ })
+
if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
if (_non_predicate_columns.empty()) {
return Status::InternalError("_non_predicate_columns is empty");
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 161a79fb7c1..21f87c68d5d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -994,8 +994,10 @@ void
ScanLocalState<Derived>::_normalize_compound_predicate(
auto compound_fn_name = expr->fn().name.function_name;
auto children_num = expr->children().size();
for (auto i = 0; i < children_num; ++i) {
- auto child_expr = expr->children()[i].get();
- if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
+ auto* child_expr = expr->children()[i].get();
+ if (TExprNodeType::BINARY_PRED == child_expr->node_type() ||
+ TExprNodeType::IN_PRED == child_expr->node_type() ||
+ TExprNodeType::MATCH_PRED == child_expr->node_type()) {
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range_on_slot = nullptr;
if (_is_predicate_acting_on_slot(child_expr,
in_predicate_checker, &slot,
@@ -1010,30 +1012,16 @@ void
ScanLocalState<Derived>::_normalize_compound_predicate(
value_range.mark_runtime_filter_predicate(
_is_runtime_filter_predicate);
}};
-
static_cast<void>(_normalize_binary_in_compound_predicate(
- child_expr, expr_ctx, slot,
value_range, pdt));
- },
- active_range);
-
- _compound_value_ranges.emplace_back(active_range);
- }
- } else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) {
- SlotDescriptor* slot = nullptr;
- ColumnValueRangeType* range_on_slot = nullptr;
- if (_is_predicate_acting_on_slot(child_expr,
in_predicate_checker, &slot,
- &range_on_slot) ||
- _is_predicate_acting_on_slot(child_expr,
eq_predicate_checker, &slot,
- &range_on_slot)) {
- ColumnValueRangeType active_range =
- *range_on_slot; // copy, in order not to affect
the range in the _colname_to_value_range
- std::visit(
- [&](auto& value_range) {
- Defer mark_runtime_filter_flag {[&]() {
- value_range.mark_runtime_filter_predicate(
- _is_runtime_filter_predicate);
- }};
-
static_cast<void>(_normalize_match_in_compound_predicate(
- child_expr, expr_ctx, slot,
value_range, pdt));
+ if (TExprNodeType::BINARY_PRED ==
child_expr->node_type()) {
+
static_cast<void>(_normalize_binary_compound_predicate(
+ child_expr, expr_ctx, slot,
value_range, pdt));
+ } else if (TExprNodeType::IN_PRED ==
child_expr->node_type()) {
+
static_cast<void>(_normalize_in_and_not_in_compound_predicate(
+ child_expr, expr_ctx, slot,
value_range, pdt));
+ } else {
+
static_cast<void>(_normalize_match_compound_predicate(
+ child_expr, expr_ctx, slot,
value_range, pdt));
+ }
},
active_range);
@@ -1050,7 +1038,7 @@ void
ScanLocalState<Derived>::_normalize_compound_predicate(
template <typename Derived>
template <PrimitiveType T>
-Status ScanLocalState<Derived>::_normalize_binary_in_compound_predicate(
+Status ScanLocalState<Derived>::_normalize_binary_compound_predicate(
vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range, PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
@@ -1107,7 +1095,56 @@ Status
ScanLocalState<Derived>::_normalize_binary_in_compound_predicate(
template <typename Derived>
template <PrimitiveType T>
-Status ScanLocalState<Derived>::_normalize_match_in_compound_predicate(
+Status ScanLocalState<Derived>::_normalize_in_and_not_in_compound_predicate(
+ vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx,
SlotDescriptor* slot,
+ ColumnValueRange<T>& range, PushDownType* pdt) {
+ if (TExprNodeType::IN_PRED == expr->node_type()) {
+ std::string fn_name = expr->op() == TExprOpcode::type::FILTER_IN ?
"in" : "not_in";
+
+ HybridSetBase::IteratorBase* iter = nullptr;
+ auto hybrid_set = expr->get_set_func();
+
+ if (hybrid_set != nullptr) {
+ if (hybrid_set->size() <=
+ _parent->cast<typename
Derived::Parent>()._max_pushdown_conditions_per_column) {
+ iter = hybrid_set->begin();
+ } else {
+ _filter_predicates.in_filters.emplace_back(slot->col_name(),
expr->get_set_func());
+ *pdt = PushDownType::ACCEPTABLE;
+ return Status::OK();
+ }
+ } else {
+ vectorized::VInPredicate* pred =
static_cast<vectorized::VInPredicate*>(expr);
+
+ vectorized::InState* state =
reinterpret_cast<vectorized::InState*>(
+ expr_ctx->fn_context(pred->fn_context_index())
+
->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+
+ if (!state->use_set) {
+ return Status::OK();
+ }
+
+ iter = state->hybrid_set->begin();
+ }
+
+ while (iter->has_next()) {
+ if (nullptr == iter->get_value()) {
+ iter->next();
+ continue;
+ }
+ auto* value = const_cast<void*>(iter->get_value());
+ RETURN_IF_ERROR(_change_value_range<false>(
+ range, value,
ColumnValueRange<T>::add_compound_value_range, fn_name, 0));
+ iter->next();
+ }
+ *pdt = PushDownType::ACCEPTABLE;
+ }
+ return Status::OK();
+}
+
+template <typename Derived>
+template <PrimitiveType T>
+Status ScanLocalState<Derived>::_normalize_match_compound_predicate(
vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range, PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 84db26da051..6c2c3e80346 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -300,16 +300,23 @@ protected:
vectorized::VExprSPtr&)>&
eq_predicate_checker);
template <PrimitiveType T>
- Status _normalize_binary_in_compound_predicate(vectorized::VExpr* expr,
- vectorized::VExprContext*
expr_ctx,
- SlotDescriptor* slot,
ColumnValueRange<T>& range,
- PushDownType* pdt);
+ Status _normalize_binary_compound_predicate(vectorized::VExpr* expr,
+ vectorized::VExprContext*
expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ PushDownType* pdt);
template <PrimitiveType T>
- Status _normalize_match_in_compound_predicate(vectorized::VExpr* expr,
- vectorized::VExprContext*
expr_ctx,
- SlotDescriptor* slot,
ColumnValueRange<T>& range,
- PushDownType* pdt);
+ Status _normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
+
vectorized::VExprContext* expr_ctx,
+ SlotDescriptor* slot,
+ ColumnValueRange<T>&
range,
+ PushDownType* pdt);
+
+ template <PrimitiveType T>
+ Status _normalize_match_compound_predicate(vectorized::VExpr* expr,
+ vectorized::VExprContext*
expr_ctx,
+ SlotDescriptor* slot,
ColumnValueRange<T>& range,
+ PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_is_null_predicate(vectorized::VExpr* expr,
vectorized::VExprContext* expr_ctx,
diff --git
a/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out
b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out
new file mode 100644
index 00000000000..9fbd1c8e252
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+8
+
+-- !sql --
+996
+
+-- !sql --
+210
+
+-- !sql --
+8
+
+-- !sql --
+998
+
+-- !sql --
+208
+
diff --git
a/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
new file mode 100644
index 00000000000..e0c340c0aa9
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_index_inlist_fault_injection", "nonConcurrent") {
+ // define a sql table
+ def indexTbName = "test_index_inlist_fault_injection"
+
+ sql "DROP TABLE IF EXISTS ${indexTbName}"
+ sql """
+ CREATE TABLE ${indexTbName} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT '',
+ INDEX status_idx (`status`) USING INVERTED COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def load_httplogs_data = {table_name, label, read_flag, format_flag,
file_name, ignore_failure=false,
+ expected_succ_rows = -1, load_to_single_tablet =
'true' ->
+
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'label', label + "_" + UUID.randomUUID().toString()
+ set 'read_json_by_line', read_flag
+ set 'format', format_flag
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+ if (expected_succ_rows >= 0) {
+ set 'max_filter_ratio', '1'
+ }
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (ignore_failure && expected_succ_rows < 0) { return }
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ }
+ }
+ }
+
+ try {
+ load_httplogs_data.call(indexTbName,
'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json')
+
+ sql "sync"
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._rowid_result_for_index")
+
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where clientip in ('40.135.0.0', '232.0.0.0',
'26.1.0.0'); """
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where status in (1, 304, 200); """
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where (request match 'hm' or clientip in
('40.135.0.0', '232.0.0.0', '26.1.0.0')); """
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where (request match 'hm' and clientip in
('40.135.0.0', '232.0.0.0', '26.1.0.0')); """
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where (request match 'hm' or status in (1, 304,
200)); """
+ qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */
count() from ${indexTbName} where (request match 'hm' and status in (1, 304,
200)); """
+
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("segment_iterator._rowid_result_for_index")
+ }
+ } finally {
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]