This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2809ab24dd2cfe3bc377f8e1920c49808b7a36fa Author: Kaijie Chen <[email protected]> AuthorDate: Mon May 27 15:29:38 2024 +0800 [fix](move-memtable) clear load streams before shutdown SegmentFileWriterThreadPool (#35217) --- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_mgr.cpp | 14 ++++++++------ be/src/runtime/load_stream_mgr.h | 5 ++--- be/src/service/internal_service.cpp | 4 ++-- be/test/runtime/load_stream_test.cpp | 4 ++-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index be1cb7756a1..c61a2d163de 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -169,6 +169,6 @@ private: QueryThreadContext _query_thread_context; }; -using LoadStreamSharedPtr = std::shared_ptr<LoadStream>; +using LoadStreamPtr = std::unique_ptr<LoadStream>; } // namespace doris diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 1964cf257e3..4eee9ba64a9 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -48,23 +48,25 @@ LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, } LoadStreamMgr::~LoadStreamMgr() { + _load_streams_map.clear(); _file_writer_thread_pool->shutdown(); } Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request, - LoadStreamSharedPtr& load_stream) { + LoadStream*& load_stream) { UniqueId load_id(request->load_id()); { std::lock_guard l(_lock); auto it = _load_streams_map.find(load_id); if (it != _load_streams_map.end()) { - load_stream = it->second; + load_stream = it->second.get(); } else { - load_stream = std::make_shared<LoadStream>(request->load_id(), this, - request->enable_profile()); - RETURN_IF_ERROR(load_stream->init(request)); - _load_streams_map[load_id] = load_stream; + auto p = std::make_unique<LoadStream>(request->load_id(), this, + request->enable_profile()); + RETURN_IF_ERROR(p->init(request)); + load_stream = p.get(); + _load_streams_map[load_id] = std::move(p); } load_stream->add_source(request->src_id()); } diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index ff742012774..9e875f3b829 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -38,8 +38,7 @@ public: FifoThreadPool* light_work_pool); ~LoadStreamMgr(); - Status open_load_stream(const POpenLoadStreamRequest* request, - LoadStreamSharedPtr& load_stream); + Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream); void clear_load(UniqueId loadid); void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) { for (int i = 0; i < _num_threads * 2; i++) { @@ -56,7 +55,7 @@ public: private: std::mutex _lock; - std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map; + std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map; std::unique_ptr<ThreadPool> _file_writer_thread_pool; uint32_t _num_threads = 0; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 478a9117a29..338b8b5fb9c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -376,14 +376,14 @@ void PInternalService::open_load_stream(google::protobuf::RpcController* control tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); } - LoadStreamSharedPtr load_stream; + LoadStream* load_stream = nullptr; auto st = _load_stream_mgr->open_load_stream(request, load_stream); if (!st.ok()) { st.to_protobuf(response->mutable_status()); return; } - stream_options.handler = load_stream.get(); + stream_options.handler = load_stream; stream_options.idle_timeout_ms = request->idle_timeout_ms(); DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout", { stream_options.idle_timeout_ms = 1; }); diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 1d9ff6b347c..2f9b8e803f6 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -376,12 +376,12 @@ public: tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); } - LoadStreamSharedPtr load_stream; + LoadStream* load_stream; LOG(INFO) << "total streams: " << request->total_streams(); EXPECT_GT(request->total_streams(), 0); auto st = _load_stream_mgr->open_load_stream(request, load_stream); - stream_options.handler = load_stream.get(); + stream_options.handler = load_stream; StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
