This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 770cdabda3459e197b31291d3a39c56d131e1213 Author: huanghaibin <[email protected]> AuthorDate: Wed Feb 28 21:31:17 2024 +0800 [fix](group_commit) GroupCommitBlockSink shoud not use load_block_queue when creating load task fail (#31416) --- be/src/olap/wal/wal_writer.cpp | 9 +++++++++ be/src/runtime/group_commit_mgr.cpp | 11 +++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp index 1e869e94048..43f021c9d31 100644 --- a/be/src/olap/wal/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -48,6 +48,9 @@ Status WalWriter::init() { } Status WalWriter::finalize() { + if (!_file_writer) { + return Status::InternalError("wal writer is null,fail to close file={}", _file_name); + } auto st = _file_writer->close(); if (!st.ok()) { LOG(WARNING) << "fail to close wal " << _file_name; @@ -56,6 +59,9 @@ Status WalWriter::finalize() { } Status WalWriter::append_blocks(const PBlockArray& blocks) { + if (!_file_writer) { + return Status::InternalError("wal writer is null,fail to write file={}", _file_name); + } size_t total_size = 0; size_t offset = 0; for (const auto& block : blocks) { @@ -85,6 +91,9 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { } Status WalWriter::append_header(uint32_t version, std::string col_ids) { + if (!_file_writer) { + return Status::InternalError("wal writer is null,fail to write file={}", _file_name); + } size_t total_size = 0; uint64_t length = col_ids.size(); total_size += k_wal_magic_length; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 032c5bc525f..1e231055cff 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -212,8 +212,11 @@ Status GroupCommitTable::get_first_block_load_queue( if (!_need_plan_fragment) { _need_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { - [[maybe_unused]] auto st = - _create_group_commit_load(load_block_queue, be_exe_version); + auto st = _create_group_commit_load(load_block_queue, be_exe_version); + if (!st.ok()) { + LOG(WARNING) << "fail to create block queue,st=" << st.to_string(); + load_block_queue.reset(); + } })); } _cv.wait_for(l, std::chrono::seconds(4)); @@ -311,8 +314,6 @@ Status GroupCommitTable::_create_group_commit_load( result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); std::unique_lock l(_lock); - _load_block_queues.emplace(instance_id, load_block_queue); - _need_plan_fragment = false; //create wal if (!is_pipeline) { RETURN_IF_ERROR(load_block_queue->create_wal( @@ -324,6 +325,8 @@ Status GroupCommitTable::_create_group_commit_load( pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, be_exe_version)); } + _load_block_queues.emplace(instance_id, load_block_queue); + _need_plan_fragment = false; _cv.notify_all(); } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
