This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 770cdabda3459e197b31291d3a39c56d131e1213
Author: huanghaibin <[email protected]>
AuthorDate: Wed Feb 28 21:31:17 2024 +0800

    [fix](group_commit) GroupCommitBlockSink shoud not use load_block_queue 
when creating load task fail (#31416)
---
 be/src/olap/wal/wal_writer.cpp      |  9 +++++++++
 be/src/runtime/group_commit_mgr.cpp | 11 +++++++----
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 1e869e94048..43f021c9d31 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -48,6 +48,9 @@ Status WalWriter::init() {
 }
 
 Status WalWriter::finalize() {
+    if (!_file_writer) {
+        return Status::InternalError("wal writer is null,fail to close 
file={}", _file_name);
+    }
     auto st = _file_writer->close();
     if (!st.ok()) {
         LOG(WARNING) << "fail to close wal " << _file_name;
@@ -56,6 +59,9 @@ Status WalWriter::finalize() {
 }
 
 Status WalWriter::append_blocks(const PBlockArray& blocks) {
+    if (!_file_writer) {
+        return Status::InternalError("wal writer is null,fail to write 
file={}", _file_name);
+    }
     size_t total_size = 0;
     size_t offset = 0;
     for (const auto& block : blocks) {
@@ -85,6 +91,9 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
 }
 
 Status WalWriter::append_header(uint32_t version, std::string col_ids) {
+    if (!_file_writer) {
+        return Status::InternalError("wal writer is null,fail to write 
file={}", _file_name);
+    }
     size_t total_size = 0;
     uint64_t length = col_ids.size();
     total_size += k_wal_magic_length;
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 032c5bc525f..1e231055cff 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -212,8 +212,11 @@ Status GroupCommitTable::get_first_block_load_queue(
             if (!_need_plan_fragment) {
                 _need_plan_fragment = true;
                 RETURN_IF_ERROR(_thread_pool->submit_func([&] {
-                    [[maybe_unused]] auto st =
-                            _create_group_commit_load(load_block_queue, 
be_exe_version);
+                    auto st = _create_group_commit_load(load_block_queue, 
be_exe_version);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "fail to create block queue,st=" << 
st.to_string();
+                        load_block_queue.reset();
+                    }
                 }));
             }
             _cv.wait_for(l, std::chrono::seconds(4));
@@ -311,8 +314,6 @@ Status GroupCommitTable::_create_group_commit_load(
                 result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
                 result.group_commit_data_bytes);
         std::unique_lock l(_lock);
-        _load_block_queues.emplace(instance_id, load_block_queue);
-        _need_plan_fragment = false;
         //create wal
         if (!is_pipeline) {
             RETURN_IF_ERROR(load_block_queue->create_wal(
@@ -324,6 +325,8 @@ Status GroupCommitTable::_create_group_commit_load(
                     
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
                     be_exe_version));
         }
+        _load_block_queues.emplace(instance_id, load_block_queue);
+        _need_plan_fragment = false;
         _cv.notify_all();
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to