This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e53acaea1 [pipelineX](fix) Fix local exchange on pipelineX engine 
(#27763)
34e53acaea1 is described below

commit 34e53acaea1d7c103e810b3034893fe88ad77678
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 30 11:16:20 2023 +0800

    [pipelineX](fix) Fix local exchange on pipelineX engine (#27763)
---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  9 +++--
 be/src/pipeline/exec/join_probe_operator.h         |  6 ++--
 be/src/pipeline/pipeline.h                         |  2 +-
 be/src/pipeline/pipeline_x/dependency.h            |  9 +++--
 .../local_exchange_sink_operator.cpp               | 10 ++++--
 .../local_exchange/local_exchange_sink_operator.h  | 10 +++---
 .../local_exchange_source_operator.cpp             | 38 ++++++++++++++--------
 .../local_exchange_source_operator.h               | 27 ++++++++++++++-
 be/src/pipeline/pipeline_x/operator.h              | 10 +-----
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 35 +++++++++++++-------
 .../pipeline_x/pipeline_x_fragment_context.h       |  5 +--
 be/src/runtime/runtime_state.cpp                   |  5 +--
 12 files changed, 106 insertions(+), 60 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 412c358037d..174d102993d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -577,7 +577,11 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
         DCHECK(!_build_unique);
         DCHECK(_have_other_join_conjunct);
     }
+    return Status::OK();
+}
 
+Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
     // init left/right output slots flags, only column of slot_id in 
_hash_output_slot_ids need
     // insert to output block of hash join.
     // _left_output_slots_flags : column of left table need to output set flag 
= true
@@ -596,11 +600,6 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
     init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), 
_left_output_slot_flags);
     init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
                             _right_output_slot_flags);
-    return Status::OK();
-}
-
-Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
-    
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
     // _other_join_conjuncts are evaluated in the context of the rows produced 
by this node
     for (auto& conjunct : _other_join_conjuncts) {
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index 67537e65cac..12d89c1049e 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -50,15 +50,15 @@ protected:
     // output expr
     vectorized::VExprContextSPtrs _output_expr_ctxs;
     vectorized::Block _join_block;
-    vectorized::MutableColumnPtr _tuple_is_null_left_flag_column;
-    vectorized::MutableColumnPtr _tuple_is_null_right_flag_column;
+    vectorized::MutableColumnPtr _tuple_is_null_left_flag_column = nullptr;
+    vectorized::MutableColumnPtr _tuple_is_null_right_flag_column = nullptr;
 
     RuntimeProfile::Counter* _probe_timer = nullptr;
     RuntimeProfile::Counter* _probe_rows_counter = nullptr;
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
 
-    std::unique_ptr<vectorized::Block> _child_block;
+    std::unique_ptr<vectorized::Block> _child_block = nullptr;
     SourceState _child_source_state;
 };
 
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7dcffb410a2..f4b7928887c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -145,7 +145,7 @@ private:
     // Operators for pipelineX. All pipeline tasks share operators from this.
     // [SourceOperator -> ... -> SinkOperator]
     OperatorXs operatorXs;
-    DataSinkOperatorXPtr _sink_x;
+    DataSinkOperatorXPtr _sink_x = nullptr;
 
     std::shared_ptr<ObjectPool> _obj_pool;
 
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 8a55efcdb7f..3fd1489103d 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -584,10 +584,10 @@ public:
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
     std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
-    void add_running_sink_operators() { running_sink_operators++; }
+    std::mutex le_lock;
     void sub_running_sink_operators() {
-        auto val = running_sink_operators.fetch_sub(1);
-        if (val == 1) {
+        std::unique_lock<std::mutex> lc(le_lock);
+        if (running_sink_operators.fetch_sub(1) == 1) {
             _set_ready_for_read();
         }
     }
@@ -599,11 +599,10 @@ public:
     }
     void set_dep_by_channel_id(Dependency* dep, int channel_id) {
         source_dependencies[channel_id] = dep;
-        dep->block();
     }
     void set_ready_for_read(int channel_id) {
         auto* dep = source_dependencies[channel_id];
-        DCHECK(dep);
+        DCHECK(dep) << channel_id << " " << (int64_t)this;
         dep->set_ready();
     }
 };
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index a793a22761f..12cc5e042e8 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -25,9 +25,14 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     SCOPED_TIMER(_open_timer);
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+
     auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
+    _num_rows_in_queue.resize(p._num_partitions);
+    for (size_t i = 0; i < p._num_partitions; i++) {
+        _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
+                profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 
1);
+    }
     RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
-    _shared_state->add_running_sink_operators();
     return Status::OK();
 }
 
@@ -59,8 +64,9 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* 
state,
         size_t size = _partition_rows_histogram[i + 1] - start;
         if (size > 0) {
             data_queue[i].enqueue({new_block, {row_idx, start, size}});
+            _shared_state->set_ready_for_read(i);
+            COUNTER_UPDATE(_num_rows_in_queue[i], size);
         }
-        _shared_state->set_ready_for_read(i);
     }
 
     return Status::OK();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index b6ce3fbeb9e..45d61d4ff6b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -51,8 +51,9 @@ private:
 
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
     RuntimeProfile::Counter* _distribute_timer = nullptr;
-    std::unique_ptr<vectorized::PartitionerBase> _partitioner;
-    std::vector<size_t> _partition_rows_histogram;
+    std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
+    std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
+    std::vector<size_t> _partition_rows_histogram {};
 };
 
 // A single 32-bit division on a recent x64 processor has a throughput of one 
instruction every six cycles with a latency of 26 cycles.
@@ -69,8 +70,9 @@ struct LocalExchangeChannelIds {
 class LocalExchangeSinkOperatorX final : public 
DataSinkOperatorX<LocalExchangeSinkLocalState> {
 public:
     using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
-    LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const 
std::vector<TExpr>& texprs)
-            : Base(sink_id, -1), _num_partitions(num_partitions), 
_texprs(texprs) {}
+    LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
+                               const std::vector<TExpr>& texprs)
+            : Base(sink_id, -1, dest_id), _num_partitions(num_partitions), 
_texprs(texprs) {}
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 81c9cc21447..83dac5eb8f4 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -41,30 +41,40 @@ Status 
LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
     PartitionedBlock partitioned_block;
     std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
 
-    if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
-                partitioned_block)) {
-        SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block =
-                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
-
+    auto get_data = [&](vectorized::Block* result_block) {
         do {
             const auto* offset_start = &((
                     
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
             mutable_block->add_rows(partitioned_block.first.get(), 
offset_start,
                                     offset_start + 
std::get<2>(partitioned_block.second));
-        } while 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
-                         partitioned_block) &&
-                 mutable_block->rows() < state->batch_size());
-        *block = mutable_block->to_block();
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        if (local_state._shared_state->running_sink_operators == 0) {
+        } while (mutable_block->rows() < state->batch_size() &&
+                 
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+                         partitioned_block));
+        *result_block = mutable_block->to_block();
+    };
+    if (local_state._shared_state->running_sink_operators == 0) {
+        if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+                    partitioned_block)) {
+            SCOPED_TIMER(local_state._copy_data_timer);
+            mutable_block =
+                    
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+            get_data(block);
+        } else {
+            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             source_state = SourceState::FINISHED;
         }
+    } else if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+                       partitioned_block)) {
+        SCOPED_TIMER(local_state._copy_data_timer);
+        mutable_block =
+                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+        get_data(block);
+    } else {
+        local_state._dependency->block();
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
     }
 
     local_state.reached_limit(block, source_state);
-
     return Status::OK();
 }
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index ebf18d9a249..3ccc38854f5 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -28,6 +28,17 @@ public:
     LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx)
             : Dependency(id, node_id, "LocalExchangeSourceDependency", 
query_ctx) {}
     ~LocalExchangeSourceDependency() override = default;
+
+    void block() override {
+        if 
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) 
{
+            return;
+        }
+        std::unique_lock<std::mutex> 
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
+        if 
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) 
{
+            return;
+        }
+        Dependency::block();
+    }
 };
 
 class LocalExchangeSourceOperatorX;
@@ -52,7 +63,8 @@ private:
 class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceLocalState> {
 public:
     using Base = OperatorX<LocalExchangeSourceLocalState>;
-    LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, 
id) {}
+    LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase* 
parent)
+            : Base(pool, -1, id), _parent(parent) {}
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         _op_name = "LOCAL_EXCHANGE_OPERATOR";
         return Status::OK();
@@ -70,8 +82,21 @@ public:
 
     bool is_source() const override { return true; }
 
+    Status set_child(OperatorXPtr child) override {
+        if (_child_x) {
+            // Set build side child for join probe operator
+            DCHECK(_parent != nullptr);
+            RETURN_IF_ERROR(_parent->set_child(child));
+        } else {
+            _child_x = std::move(child);
+        }
+        return Status::OK();
+    }
+
 private:
     friend class LocalExchangeSourceLocalState;
+
+    OperatorXBase* _parent = nullptr;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index ed2dbfc3d50..5fa6785435b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -23,14 +23,6 @@
 
 namespace doris::pipeline {
 
-#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
-    auto _sptr = state->get_local_state(operator_id()); \
-    auto& local_state = _sptr->template cast<LocalState>();
-
-#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
-    auto _sptr = state->get_sink_local_state(operator_id()); \
-    auto& local_state = _sptr->template cast<LocalState>();
-
 // This struct is used only for initializing local state.
 struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
@@ -279,7 +271,7 @@ protected:
 
     RowDescriptor _row_descriptor;
 
-    std::unique_ptr<RowDescriptor> _output_row_descriptor;
+    std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
     vectorized::VExprContextSPtrs _projections;
 
     /// Resource information sent from the frontend.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1924dc90d58..d49e290c044 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     if (_prepared) {
         return Status::InternalError("Already prepared");
     }
+    _num_instances = request.local_params.size();
     _runtime_profile.reset(new RuntimeProfile("PipelineContext"));
     _start_timer = ADD_TIMER(_runtime_profile, "StartTime");
     COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
@@ -232,7 +233,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
 
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
-        //TODO: can we do this in set_sink?
+        DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
         
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
         RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
     }
@@ -593,15 +594,17 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
 }
 
 Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, 
OperatorXPtr& op,
-                                                     PipelinePtr& cur_pipe,
+                                                     PipelinePtr& cur_pipe, 
const TPlanNode& tnode,
                                                      const std::vector<TExpr>& 
texprs) {
-    if (!_runtime_state->enable_local_shuffle() ||
-        _runtime_state->query_parallel_instance_num() == 1) {
+    if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
         return Status::OK();
     }
+    auto parent = op;
+    RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get()));
     auto local_exchange_id = next_operator_id();
-    op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
+    op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id, 
parent.get()));
     RETURN_IF_ERROR(cur_pipe->add_operator(op));
+    RETURN_IF_ERROR(parent->set_child(op));
 
     const auto downstream_pipeline_id = cur_pipe->id();
     if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -611,14 +614,15 @@ Status 
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
     _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
     DataSinkOperatorXPtr sink;
-    auto num_instances = _runtime_state->query_parallel_instance_num();
-    sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id, 
num_instances, texprs));
+    sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), 
local_exchange_id,
+                                              _num_instances, texprs));
     RETURN_IF_ERROR(cur_pipe->set_sink(sink));
     RETURN_IF_ERROR(cur_pipe->sink_x()->init());
 
     auto shared_state = LocalExchangeSharedState::create_shared();
-    shared_state->data_queue.resize(num_instances);
-    shared_state->source_dependencies.resize(num_instances, nullptr);
+    shared_state->data_queue.resize(_num_instances);
+    shared_state->source_dependencies.resize(_num_instances, nullptr);
+    shared_state->running_sink_operators = _num_instances;
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
     return Status::OK();
 }
@@ -717,9 +721,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
 
             if (!tnode.agg_node.need_finalize) {
-                RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
-                RETURN_IF_ERROR(
-                        _add_local_exchange(pool, op, cur_pipe, 
tnode.agg_node.grouping_exprs));
+                RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
+                                                    
tnode.agg_node.grouping_exprs));
             }
         }
         break;
@@ -740,6 +743,14 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+
+        std::vector<TExpr> probe_exprs;
+        const std::vector<TEqJoinCondition>& eq_join_conjuncts =
+                tnode.hash_join_node.eq_join_conjuncts;
+        for (const auto& eq_join_conjunct : eq_join_conjuncts) {
+            probe_exprs.push_back(eq_join_conjunct.left);
+        }
+        RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs));
         _pipeline_parent_map.push(op->node_id(), cur_pipe);
         _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         break;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index f579265ab63..7f47052296e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,7 +125,7 @@ private:
     void _close_fragment_instance() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
     Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, 
PipelinePtr& cur_pipe,
-                               const std::vector<TExpr>& texprs);
+                               const TPlanNode& tnode, const 
std::vector<TExpr>& texprs);
 
     [[nodiscard]] Status _build_pipelines(ObjectPool* pool,
                                           const 
doris::TPipelineFragmentParams& request,
@@ -170,7 +170,7 @@ private:
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wshadow-field"
 #endif
-    DataSinkOperatorXPtr _sink;
+    DataSinkOperatorXPtr _sink = nullptr;
 #ifdef __clang__
 #pragma clang diagnostic pop
 #endif
@@ -210,6 +210,7 @@ private:
 
     int _operator_id = 0;
     int _sink_operator_id = 0;
+    int _num_instances = 0;
     std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> 
_op_id_to_le_state;
 };
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index c6df2daff0d..43eff466019 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -458,8 +458,9 @@ Result<RuntimeState::LocalState*> 
RuntimeState::get_local_state_result(int id) {
 
 void RuntimeState::emplace_sink_local_state(
         int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> 
state) {
-    DCHECK(id < _op_id_to_sink_local_state.size());
-    DCHECK(!_op_id_to_sink_local_state[id]);
+    DCHECK(id < _op_id_to_sink_local_state.size())
+            << " id=" << id << " state: " << state->debug_string(0);
+    DCHECK(!_op_id_to_sink_local_state[id]) << " id=" << id << " state: " << 
state->debug_string(0);
     _op_id_to_sink_local_state[id] = std::move(state);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to