This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0590731cf06 [fix](group commit) Group commit core for agg_state 
(#36220)
0590731cf06 is described below

commit 0590731cf06fbb12fb24143fb476daacf7f726e5
Author: meiyi <[email protected]>
AuthorDate: Thu Jun 13 16:00:32 2024 +0800

    [fix](group commit) Group commit core for agg_state (#36220)
    
    when run `test_agg_state_max_by` case in group commit mode, we will get:
    ```
    F20240612 17:38:15.212311 2096700 block.h:130] Check failed: data.size() > 
position , data.size()=4, position=4
    *** Check failure stack trace: ***
        @     0x55f41c129496  google::LogMessage::SendToLog()
        @     0x55f41c125ee0  google::LogMessage::Flush()
        @     0x55f41c129cd9  google::LogMessageFatal::~LogMessageFatal()
        @     0x55f3ed988343  doris::vectorized::Block::get_by_position()
        @     0x55f4188def92  
doris::vectorized::OlapTableBlockConvertor::_validate_data()
        @     0x55f4188dc1de  
doris::vectorized::OlapTableBlockConvertor::validate_and_convert_block()
        @     0x55f419090d04  
doris::pipeline::GroupCommitBlockSinkOperatorX::sink()
        @     0x55f41be3d950  
doris::pipeline::PipelineTask::execute()::$_2::operator()()
        @     0x55f41be3a985  doris::pipeline::PipelineTask::execute()
    ```
---
 be/src/pipeline/exec/group_commit_block_sink_operator.cpp | 2 ++
 be/src/pipeline/exec/group_commit_block_sink_operator.h   | 6 ++++--
 be/src/pipeline/pipeline_fragment_context.cpp             | 3 ++-
 be/src/runtime/group_commit_mgr.cpp                       | 6 +++---
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 1ab87120cb2..50b314a7dc1 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -245,6 +245,8 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& 
t_sink) {
     _group_commit_mode = table_sink.group_commit_mode;
     _load_id = table_sink.load_id;
     _max_filter_ratio = table_sink.max_filter_ratio;
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index f8c9867bf5c..b65326725b6 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -73,8 +73,9 @@ class GroupCommitBlockSinkOperatorX final
     using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>;
 
 public:
-    GroupCommitBlockSinkOperatorX(int operator_id, const RowDescriptor& 
row_desc)
-            : Base(operator_id, 0), _row_desc(row_desc) {}
+    GroupCommitBlockSinkOperatorX(int operator_id, const RowDescriptor& 
row_desc,
+                                  const std::vector<TExpr>& t_output_expr)
+            : Base(operator_id, 0), _row_desc(row_desc), 
_t_output_expr(t_output_expr) {}
 
     ~GroupCommitBlockSinkOperatorX() override = default;
 
@@ -90,6 +91,7 @@ private:
     friend class GroupCommitBlockSinkLocalState;
 
     const RowDescriptor& _row_desc;
+    const std::vector<TExpr>& _t_output_expr;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
     int _tuple_desc_id = -1;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7a78c255170..2b765afd77d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -980,7 +980,8 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
     }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
-        _sink.reset(new GroupCommitBlockSinkOperatorX(next_sink_operator_id(), 
row_desc));
+        _sink.reset(
+                new GroupCommitBlockSinkOperatorX(next_sink_operator_id(), 
row_desc, output_exprs));
         break;
     }
     case TDataSinkType::HIVE_TABLE_SINK: {
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index c7b2492c610..3faafa90b66 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -154,8 +154,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     if (runtime_state->is_cancelled()) {
         auto st = runtime_state->cancel_reason();
         _cancel_without_lock(st);
-        return Status::Cancelled("cancel group_commit, label=" + label +
-                                 ", status=" + st.to_string());
+        return status;
     }
     if (!_block_queue.empty()) {
         const BlockData block_data = _block_queue.front();
@@ -218,7 +217,8 @@ void LoadBlockQueue::cancel(const Status& st) {
 void LoadBlockQueue::_cancel_without_lock(const Status& st) {
     LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", 
label=" << label
               << ", status=" << st.to_string();
-    status = st;
+    status =
+            Status::Cancelled("cancel group_commit, label=" + label + ", 
status=" + st.to_string());
     while (!_block_queue.empty()) {
         const BlockData& block_data = _block_queue.front().block;
         int before_block_queues_bytes = _all_block_queues_bytes->load();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to