This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b38b8b4494 [pipelineX](fix) Fix BE crash caused by join and constant 
expr (#24862)
b38b8b4494 is described below

commit b38b8b4494de38b43c1c6c19a39de23538e12b22
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 25 21:01:09 2023 +0800

    [pipelineX](fix) Fix BE crash caused by join and constant expr (#24862)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 10 ++++-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   | 48 ++++++++++++++++++----
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  1 -
 .../exec/nested_loop_join_probe_operator.cpp       |  5 ++-
 .../exec/nested_loop_join_probe_operator.h         |  2 +
 be/src/pipeline/exec/scan_operator.h               |  2 +-
 be/src/pipeline/pipeline_x/dependency.h            |  1 +
 be/src/vec/exprs/vcase_expr.cpp                    |  7 +++-
 be/src/vec/exprs/vcast_expr.cpp                    |  7 +++-
 be/src/vec/exprs/vectorized_fn_call.cpp            |  7 +++-
 be/src/vec/exprs/vexpr.cpp                         |  9 +++-
 be/src/vec/exprs/vin_predicate.cpp                 |  7 +++-
 be/src/vec/exprs/vmatch_predicate.cpp              |  7 +++-
 be/src/vec/functions/function_fake.h               |  1 +
 14 files changed, 96 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index db6bae55b2..92763f9f27 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -410,7 +410,6 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
         _build_expr_ctxs.push_back(ctx);
 
         const auto vexpr = _build_expr_ctxs.back()->root();
-        const auto& data_type = vexpr->data_type();
 
         bool null_aware = eq_join_conjunct.__isset.opcode &&
                           eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL;
@@ -421,7 +420,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
         _store_null_in_hash_table.emplace_back(
                 null_aware ||
                 (_build_expr_ctxs.back()->root()->is_nullable() && 
build_stores_null));
+    }
 
+    for (const auto& expr : _build_expr_ctxs) {
+        const auto& data_type = expr->root()->data_type();
         if (!data_type->have_maximum_size_of_value()) {
             break;
         }
@@ -589,6 +591,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
     local_state.init_short_circuit_for_probe();
     if (source_state == SourceState::FINISHED) {
+        // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
+        // when the build side is not empty.
+        if (!local_state._shared_state->build_blocks->empty() &&
+            _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+            local_state._shared_state->probe_ignore_null = true;
+        }
         local_state._dependency->set_ready_for_read();
     }
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 34f8e6102a..0a0438720a 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -34,7 +34,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
-    _probe_ignore_null = p._probe_ignore_null;
+    _shared_state->probe_ignore_null = p._probe_ignore_null;
     _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
     for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
@@ -43,11 +43,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     for (size_t i = 0; i < _other_join_conjuncts.size(); i++) {
         RETURN_IF_ERROR(p._other_join_conjuncts[i]->clone(state, 
_other_join_conjuncts[i]));
     }
-    // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
-    // when the build side is not empty.
-    if (!_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        _probe_ignore_null = true;
-    }
     _construct_mutable_join_block();
     _probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
     _probe_arena_memory_usage =
@@ -189,6 +184,42 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     local_state.init_for_probe(state);
     SCOPED_TIMER(local_state._probe_timer);
     if (local_state._shared_state->short_circuit_for_probe) {
+        /// If `_short_circuit_for_probe` is true, this indicates no rows
+        /// match the join condition, and this is 'mark join', so we need to 
create a column as mark
+        /// with all rows set to 0.
+        if (_is_mark_join) {
+            auto block_rows = local_state._probe_block.rows();
+            if (block_rows == 0) {
+                if (local_state._probe_eos) {
+                    source_state = SourceState::FINISHED;
+                }
+                return Status::OK();
+            }
+
+            vectorized::Block temp_block;
+            //get probe side output column
+            for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+                if (_left_output_slot_flags[i]) {
+                    
temp_block.insert(local_state._probe_block.get_by_position(i));
+                }
+            }
+            auto mark_column = vectorized::ColumnUInt8::create(block_rows, 0);
+            temp_block.insert(
+                    {std::move(mark_column), 
std::make_shared<vectorized::DataTypeUInt8>(), ""});
+
+            {
+                SCOPED_TIMER(local_state._join_filter_timer);
+                RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+                        local_state._conjuncts, &temp_block, 
temp_block.columns()));
+            }
+
+            RETURN_IF_ERROR(local_state._build_output_block(&temp_block, 
output_block, false));
+            temp_block.clear();
+            local_state._probe_block.clear_column_data(
+                    _child_x->row_desc().num_materialized_slots());
+            local_state.reached_limit(output_block, source_state);
+            return Status::OK();
+        }
         // If we use a short-circuit strategy, should return empty block 
directly.
         source_state = SourceState::FINISHED;
         return Status::OK();
@@ -241,7 +272,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                     *local_state._shared_state->hash_table_variants,
                     *local_state._process_hashtable_ctx_variants,
                     
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
-                    
vectorized::make_bool_variant(local_state._probe_ignore_null));
+                    
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
         });
     } else if (local_state._probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != 
TJoinOp::LEFT_OUTER_JOIN)) {
@@ -299,7 +330,8 @@ bool 
HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const {
     auto& local_state = 
state->get_local_state(id())->cast<HashJoinProbeLocalState>();
     return (local_state._probe_block.rows() == 0 ||
             local_state._probe_index == local_state._probe_block.rows()) &&
-           !local_state._probe_eos && 
!local_state._shared_state->short_circuit_for_probe;
+           !local_state._probe_eos &&
+           (!local_state._shared_state->short_circuit_for_probe || 
_is_mark_join);
 }
 
 Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block,
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index bac12a004a..5c451b9fde 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -85,7 +85,6 @@ private:
 
     bool _need_null_map_for_probe = false;
     bool _has_set_need_null_map_for_probe = false;
-    bool _probe_ignore_null = false;
     std::unique_ptr<vectorized::HashJoinProbeContext> _probe_context;
     vectorized::ColumnUInt8::MutablePtr _null_map_column;
     // for cases when a probe row matches more than batch size build rows.
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 5f8bccea2b..ecac7c94dd 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -59,6 +59,8 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* 
state, LocalStateInfo&
         RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, 
_join_conjuncts[i]));
     }
     _construct_mutable_join_block();
+
+    _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin");
     return Status::OK();
 }
 
@@ -349,7 +351,7 @@ void 
NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
             DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_child_block->rows());
             for (int j = _left_block_start_pos;
                  j < _left_block_start_pos + _left_side_process_count; ++j) {
-                mark_data.emplace_back(IsSemi != 
_cur_probe_row_visited_flags[j]);
+                mark_data.emplace_back(IsSemi == 
_cur_probe_row_visited_flags[j]);
             }
             for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
                 const vectorized::ColumnWithTypeAndName src_column =
@@ -562,6 +564,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                                   set_build_side_flag, 
set_probe_side_flag>(
                                 state, join_op_variants);
             };
+            SCOPED_TIMER(local_state._loop_join_timer);
             RETURN_IF_ERROR(std::visit(
                     func, local_state._shared_state->join_op_variants,
                     vectorized::make_bool_variant(_match_all_build || 
_is_right_semi_anti),
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index e754ae585f..8ad39451b0 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -198,6 +198,8 @@ private:
     std::stack<uint16_t> _probe_offset_stack;
     uint64_t _output_null_idx_build_side = 0;
     vectorized::VExprContextSPtrs _join_conjuncts;
+
+    RuntimeProfile::Counter* _loop_join_timer;
 };
 
 class NestedLoopJoinProbeOperatorX final
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 928f05c272..ef9f54bff7 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -194,7 +194,7 @@ template <typename Derived>
 class ScanLocalState : public ScanLocalStateBase {
     ENABLE_FACTORY_CREATOR(ScanLocalState);
     ScanLocalState(RuntimeState* state, OperatorXBase* parent);
-    virtual ~ScanLocalState() = default;
+    ~ScanLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index f374e43b00..9c720c3ddc 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -530,6 +530,7 @@ struct HashJoinSharedState : public JoinSharedState {
     size_t build_exprs_size = 0;
     std::shared_ptr<std::vector<vectorized::Block>> build_blocks =
             std::make_shared<std::vector<vectorized::Block>>();
+    bool probe_ignore_null = false;
 };
 
 class HashJoinDependency final : public WriteDependency {
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index 5c9cdd3c15..bd93cb8226 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -79,8 +79,13 @@ Status VCaseExpr::prepare(RuntimeState* state, const 
RowDescriptor& desc, VExprC
 
 Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
                        FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    for (int i = 0; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index ef4dd08115..361833120b 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -81,8 +81,13 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, 
const doris::RowDes
 
 doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* 
context,
                               FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    for (int i = 0; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 1843d732a2..0d6a0d9d10 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -121,8 +121,13 @@ Status VectorizedFnCall::prepare(RuntimeState* state, 
const RowDescriptor& desc,
 
 Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
                               FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    for (int i = 0; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index a19dafe439..b2e3b39c5a 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -201,6 +201,9 @@ Status VExpr::open(RuntimeState* state, VExprContext* 
context,
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->open(state, context, scope));
     }
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
@@ -466,6 +469,7 @@ Status VExpr::get_const_col(VExprContext* context,
     }
 
     if (_constant_col != nullptr) {
+        DCHECK(column_wrapper != nullptr);
         *column_wrapper = _constant_col;
         return Status::OK();
     }
@@ -479,7 +483,10 @@ Status VExpr::get_const_col(VExprContext* context,
     DCHECK(result != -1);
     const auto& column = block.get_by_position(result).column;
     _constant_col = std::make_shared<ColumnPtrWrapper>(column);
-    *column_wrapper = _constant_col;
+    if (column_wrapper != nullptr) {
+        *column_wrapper = _constant_col;
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vin_predicate.cpp 
b/be/src/vec/exprs/vin_predicate.cpp
index 83e6bd6320..55e999af47 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -78,8 +78,13 @@ Status VInPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
 
 Status VInPredicate::open(RuntimeState* state, VExprContext* context,
                           FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    for (int i = 0; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vmatch_predicate.cpp 
b/be/src/vec/exprs/vmatch_predicate.cpp
index 2a21aba578..cca6389fff 100644
--- a/be/src/vec/exprs/vmatch_predicate.cpp
+++ b/be/src/vec/exprs/vmatch_predicate.cpp
@@ -92,11 +92,16 @@ Status VMatchPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
 
 Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
                              FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    for (int i = 0; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
     if (scope == FunctionContext::THREAD_LOCAL || scope == 
FunctionContext::FRAGMENT_LOCAL) {
         context->fn_context(_fn_context_index)->set_function_state(scope, 
_inverted_index_ctx);
     }
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/functions/function_fake.h 
b/be/src/vec/functions/function_fake.h
index 456672e7a2..0efca39c9a 100644
--- a/be/src/vec/functions/function_fake.h
+++ b/be/src/vec/functions/function_fake.h
@@ -54,6 +54,7 @@ public:
     }
 
     bool use_default_implementation_for_nulls() const override { return true; }
+    bool use_default_implementation_for_constants() const override { return 
false; }
 
     Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
                         size_t result, size_t input_rows_count) override {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to