This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0590731cf06 [fix](group commit) Group commit core for agg_state
(#36220)
0590731cf06 is described below
commit 0590731cf06fbb12fb24143fb476daacf7f726e5
Author: meiyi <[email protected]>
AuthorDate: Thu Jun 13 16:00:32 2024 +0800
[fix](group commit) Group commit core for agg_state (#36220)
when run `test_agg_state_max_by` case in group commit mode, we will get:
```
F20240612 17:38:15.212311 2096700 block.h:130] Check failed: data.size() >
position , data.size()=4, position=4
*** Check failure stack trace: ***
@ 0x55f41c129496 google::LogMessage::SendToLog()
@ 0x55f41c125ee0 google::LogMessage::Flush()
@ 0x55f41c129cd9 google::LogMessageFatal::~LogMessageFatal()
@ 0x55f3ed988343 doris::vectorized::Block::get_by_position()
@ 0x55f4188def92
doris::vectorized::OlapTableBlockConvertor::_validate_data()
@ 0x55f4188dc1de
doris::vectorized::OlapTableBlockConvertor::validate_and_convert_block()
@ 0x55f419090d04
doris::pipeline::GroupCommitBlockSinkOperatorX::sink()
@ 0x55f41be3d950
doris::pipeline::PipelineTask::execute()::$_2::operator()()
@ 0x55f41be3a985 doris::pipeline::PipelineTask::execute()
```
---
be/src/pipeline/exec/group_commit_block_sink_operator.cpp | 2 ++
be/src/pipeline/exec/group_commit_block_sink_operator.h | 6 ++++--
be/src/pipeline/pipeline_fragment_context.cpp | 3 ++-
be/src/runtime/group_commit_mgr.cpp | 6 +++---
4 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 1ab87120cb2..50b314a7dc1 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -245,6 +245,8 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink&
t_sink) {
_group_commit_mode = table_sink.group_commit_mode;
_load_id = table_sink.load_id;
_max_filter_ratio = table_sink.max_filter_ratio;
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index f8c9867bf5c..b65326725b6 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -73,8 +73,9 @@ class GroupCommitBlockSinkOperatorX final
using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>;
public:
- GroupCommitBlockSinkOperatorX(int operator_id, const RowDescriptor&
row_desc)
- : Base(operator_id, 0), _row_desc(row_desc) {}
+ GroupCommitBlockSinkOperatorX(int operator_id, const RowDescriptor&
row_desc,
+ const std::vector<TExpr>& t_output_expr)
+ : Base(operator_id, 0), _row_desc(row_desc),
_t_output_expr(t_output_expr) {}
~GroupCommitBlockSinkOperatorX() override = default;
@@ -90,6 +91,7 @@ private:
friend class GroupCommitBlockSinkLocalState;
const RowDescriptor& _row_desc;
+ const std::vector<TExpr>& _t_output_expr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
int _tuple_desc_id = -1;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7a78c255170..2b765afd77d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -980,7 +980,8 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
- _sink.reset(new GroupCommitBlockSinkOperatorX(next_sink_operator_id(),
row_desc));
+ _sink.reset(
+ new GroupCommitBlockSinkOperatorX(next_sink_operator_id(),
row_desc, output_exprs));
break;
}
case TDataSinkType::HIVE_TABLE_SINK: {
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index c7b2492c610..3faafa90b66 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -154,8 +154,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
if (runtime_state->is_cancelled()) {
auto st = runtime_state->cancel_reason();
_cancel_without_lock(st);
- return Status::Cancelled("cancel group_commit, label=" + label +
- ", status=" + st.to_string());
+ return status;
}
if (!_block_queue.empty()) {
const BlockData block_data = _block_queue.front();
@@ -218,7 +217,8 @@ void LoadBlockQueue::cancel(const Status& st) {
void LoadBlockQueue::_cancel_without_lock(const Status& st) {
LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ",
label=" << label
<< ", status=" << st.to_string();
- status = st;
+ status =
+ Status::Cancelled("cancel group_commit, label=" + label + ",
status=" + st.to_string());
while (!_block_queue.empty()) {
const BlockData& block_data = _block_queue.front().block;
int before_block_queues_bytes = _all_block_queues_bytes->load();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]