This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b1e0703a339 [Pick](move-memtable) pick 2 prs about move memtable
(#33257)
b1e0703a339 is described below
commit b1e0703a339d3d0e3776efed272bbd8221f357f6
Author: Xin Liao <[email protected]>
AuthorDate: Thu Apr 4 20:48:55 2024 +0800
[Pick](move-memtable) pick 2 prs about move memtable (#33257)
Co-authored-by: Kaijie Chen <[email protected]>
---
be/src/runtime/exec_env.cpp | 2 +-
be/src/runtime/exec_env.h | 6 +-
be/src/runtime/exec_env_init.cpp | 6 +-
be/src/vec/sink/load_stream_map_pool.cpp | 126 +++++++++++++++
...d_stream_stub_pool.h => load_stream_map_pool.h} | 63 +++++---
be/src/vec/sink/load_stream_stub.cpp | 35 ++---
be/src/vec/sink/load_stream_stub.h | 14 +-
be/src/vec/sink/load_stream_stub_pool.cpp | 72 ---------
be/src/vec/sink/volap_table_sink_v2.cpp | 2 -
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 175 +++++++++++----------
be/src/vec/sink/writer/vtablet_writer_v2.h | 10 +-
be/test/io/fs/stream_sink_file_writer_test.cpp | 4 +-
be/test/vec/exec/load_stream_stub_map_test.cpp | 59 +++++++
be/test/vec/exec/load_stream_stub_pool_test.cpp | 51 ------
14 files changed, 346 insertions(+), 279 deletions(-)
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 71aee1c5b47..6ecd359c108 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -34,7 +34,7 @@
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
namespace doris {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 9767b9bfc6c..ea665f3f5a8 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -80,7 +80,7 @@ class RuntimeQueryStatiticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
-class LoadStreamStubPool;
+class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
@@ -242,7 +242,7 @@ public:
}
#endif
- LoadStreamStubPool* load_stream_stub_pool() { return
_load_stream_stub_pool.get(); }
+ LoadStreamMapPool* load_stream_map_pool() { return
_load_stream_map_pool.get(); }
vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return
_delta_writer_v2_pool.get(); }
@@ -360,7 +360,7 @@ private:
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
- std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
+ std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9f7cd1ff033..1da172716da 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -98,7 +98,7 @@
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
#include "vec/spill/spill_stream_manager.h"
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) &&
!defined(LEAK_SANITIZER) && \
@@ -229,7 +229,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
- _load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
+ _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_wal_manager = WalManager::create_shared(this,
config::group_commit_wal_path);
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
@@ -552,7 +552,7 @@ void ExecEnv::destroy() {
_stream_load_executor.reset();
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
- _load_stream_stub_pool.reset();
+ _load_stream_map_pool.reset();
SAFE_STOP(_storage_engine);
SAFE_STOP(_spill_stream_mgr);
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
new file mode 100644
index 00000000000..f335f05e162
--- /dev/null
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/load_stream_map_pool.h"
+
+#include "util/debug_points.h"
+
+namespace doris {
+class TExpr;
+
+LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int
num_streams, int num_use,
+ LoadStreamMapPool* pool)
+ : _load_id(load_id),
+ _src_id(src_id),
+ _num_streams(num_streams),
+ _use_cnt(num_use),
+ _pool(pool),
+ _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+ _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
+ DCHECK(num_streams > 0) << "stream num should be greater than 0";
+ DCHECK(num_use > 0) << "use num should be greater than 0";
+}
+
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
+ if (streams != nullptr) {
+ return streams;
+ }
+ streams = std::make_shared<Streams>();
+ for (int i = 0; i < _num_streams; i++) {
+ streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
_tablet_schema_for_index,
+
_enable_unique_mow_for_index));
+ }
+ _streams_for_node[dst_id] = streams;
+ return streams;
+}
+
+std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _streams_for_node.at(dst_id);
+}
+
+bool LoadStreamMap::contains(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _streams_for_node.contains(dst_id);
+}
+
+void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ for (auto& [dst_id, streams] : _streams_for_node) {
+ fn(dst_id, *streams);
+ }
+}
+
+Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const
Streams&)> fn) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ for (auto& [dst_id, streams] : _streams_for_node) {
+ RETURN_IF_ERROR(fn(dst_id, *streams));
+ }
+ return Status::OK();
+}
+
+void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
+ const std::vector<PTabletID>&
tablets_to_commit) {
+ std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+ auto& tablets = _tablets_to_commit[dst_id];
+ tablets.insert(tablets.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
+}
+
+bool LoadStreamMap::release() {
+ int num_use = --_use_cnt;
+ if (num_use == 0) {
+ LOG(INFO) << "releasing streams, load_id=" << _load_id;
+ _pool->erase(_load_id);
+ return true;
+ }
+ LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" <<
num_use;
+ return false;
+}
+
+Status LoadStreamMap::close_load() {
+ return for_each_st([this](int64_t dst_id, const Streams& streams) ->
Status {
+ const auto& tablets = _tablets_to_commit[dst_id];
+ for (auto& stream : streams) {
+ RETURN_IF_ERROR(stream->close_load(tablets));
+ }
+ return Status::OK();
+ });
+}
+
+LoadStreamMapPool::LoadStreamMapPool() = default;
+
+LoadStreamMapPool::~LoadStreamMapPool() = default;
+std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId
load_id, int64_t src_id,
+ int
num_streams, int num_use) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
+ if (streams != nullptr) {
+ return streams;
+ }
+ streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams,
num_use, this);
+ _pool[load_id] = streams;
+ return streams;
+}
+
+void LoadStreamMapPool::erase(UniqueId load_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _pool.erase(load_id);
+}
+
+} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
similarity index 59%
rename from be/src/vec/sink/load_stream_stub_pool.h
rename to be/src/vec/sink/load_stream_map_pool.h
index 662fc5bc1a1..aad12dba2aa 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -63,57 +63,74 @@
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
class LoadStreamStub;
-class LoadStreamStubPool;
+class LoadStreamMapPool;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-class LoadStreams {
+class LoadStreamMap {
public:
- LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool);
+ LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
+ LoadStreamMapPool* pool);
- void release();
+ std::shared_ptr<Streams> get_or_create(int64_t dst_id);
- Streams& streams() { return _streams; }
+ std::shared_ptr<Streams> at(int64_t dst_id);
+
+ bool contains(int64_t dst_id);
+
+ void for_each(std::function<void(int64_t, const Streams&)> fn);
+
+ Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
+
+ void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>&
tablets_to_commit);
+
+ // Return true if the last instance is just released.
+ bool release();
+
+ // send CLOSE_LOAD to all streams, return ERROR if any.
+ // only call this method after release() returns true.
+ Status close_load();
private:
- Streams _streams;
- UniqueId _load_id;
- int64_t _dst_id;
+ const UniqueId _load_id;
+ const int64_t _src_id;
+ const int _num_streams;
std::atomic<int> _use_cnt;
- LoadStreamStubPool* _pool = nullptr;
+ std::mutex _mutex;
+ std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
+ LoadStreamMapPool* _pool = nullptr;
+ std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
+ std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+
+ std::mutex _tablets_to_commit_mutex;
+ std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};
-class LoadStreamStubPool {
+class LoadStreamMapPool {
public:
- LoadStreamStubPool();
+ LoadStreamMapPool();
- ~LoadStreamStubPool();
+ ~LoadStreamMapPool();
- std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t
src_id, int64_t dst_id,
- int num_streams, int num_sink);
+ std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t
src_id, int num_streams,
+ int num_use);
- void erase(UniqueId load_id, int64_t dst_id);
+ void erase(UniqueId load_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _pool.size();
}
- // for UT only
- size_t templates_size() {
- std::lock_guard<std::mutex> lock(_mutex);
- return _template_stubs.size();
- }
-
private:
std::mutex _mutex;
- std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>>
_template_stubs;
- std::unordered_map<std::pair<UniqueId, int64_t>,
std::shared_ptr<LoadStreams>> _pool;
+ std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool;
};
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 6eb91e46853..55a5eba77e7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -125,19 +125,13 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler
return ostr;
}
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
- : _use_cnt(num_use),
- _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map)
+ : _load_id(load_id),
_src_id(src_id),
- _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
- _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>())
{};
-
-LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
- : _use_cnt(stub._use_cnt.load()),
- _load_id(stub._load_id),
- _src_id(stub._src_id),
- _tablet_schema_for_index(stub._tablet_schema_for_index),
- _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
+ _tablet_schema_for_index(schema_map),
+ _enable_unique_mow_for_index(mow_map) {};
LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
@@ -241,23 +235,12 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
- {
- std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
- _tablets_to_commit.insert(_tablets_to_commit.end(),
tablets_to_commit.begin(),
- tablets_to_commit.end());
- }
- if (--_use_cnt > 0) {
- return Status::OK();
- }
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
- {
- std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
- for (const auto& tablet : _tablets_to_commit) {
- *header.add_tablets() = tablet;
- }
+ for (const auto& tablet : tablets_to_commit) {
+ *header.add_tablets() = tablet;
}
return _encode_and_send(header);
}
@@ -267,7 +250,7 @@ Status LoadStreamStub::get_schema(const
std::vector<PTabletID>& tablets) {
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
- header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+ header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id
<< ", load id: " << print_id(_load_id) << ", tablet id:";
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index f20b0e6ea3d..aa8b850760e 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -109,10 +109,14 @@ class LoadStreamStub {
public:
// construct new stub
- LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
+ LoadStreamStub(PUniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map);
- // copy constructor, shared_ptr members are shared
- LoadStreamStub(LoadStreamStub& stub);
+ LoadStreamStub(UniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map)
+ : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map)
{};
// for mock this class in UT
#ifdef BE_TEST
@@ -213,7 +217,6 @@ protected:
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;
- std::atomic<int> _use_cnt;
PUniqueId _load_id;
brpc::StreamId _stream_id;
@@ -226,9 +229,6 @@ protected:
bthread::Mutex _cancel_mutex;
bthread::ConditionVariable _close_cv;
- std::mutex _tablets_to_commit_mutex;
- std::vector<PTabletID> _tablets_to_commit;
-
std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
deleted file mode 100644
index d76402b57d5..00000000000
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/sink/load_stream_stub_pool.h"
-
-#include "util/debug_points.h"
-#include "vec/sink/load_stream_stub.h"
-
-namespace doris {
-class TExpr;
-
-LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool)
- : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
-
-void LoadStreams::release() {
- int num_use = --_use_cnt;
- DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
- if (num_use == 0) {
- LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id="
<< _dst_id;
- _pool->erase(_load_id, _dst_id);
- } else {
- LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" <<
_dst_id
- << ", use_cnt=" << num_use;
- }
-}
-
-LoadStreamStubPool::LoadStreamStubPool() = default;
-
-LoadStreamStubPool::~LoadStreamStubPool() = default;
-
-std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId
load_id, int64_t src_id,
- int64_t dst_id,
int num_streams,
- int num_sink) {
- auto key = std::make_pair(UniqueId(load_id), dst_id);
- std::lock_guard<std::mutex> lock(_mutex);
- std::shared_ptr<LoadStreams> streams = _pool[key];
- if (streams) {
- return streams;
- }
- DCHECK(num_streams > 0) << "stream num should be greater than 0";
- DCHECK(num_sink > 0) << "sink num should be greater than 0";
- auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id, num_sink});
- streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
- for (int32_t i = 0; i < num_streams; i++) {
- // copy construct, internal tablet schema map will be shared among all
stubs
- streams->streams().emplace_back(new LoadStreamStub {*it->second});
- }
- _pool[key] = streams;
- return streams;
-}
-
-void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- _pool.erase(std::make_pair(load_id, dst_id));
- _template_stubs.erase(load_id);
-}
-
-} // namespace doris
diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp
b/be/src/vec/sink/volap_table_sink_v2.cpp
index a41b77ace53..a73ee483bd3 100644
--- a/be/src/vec/sink/volap_table_sink_v2.cpp
+++ b/be/src/vec/sink/volap_table_sink_v2.cpp
@@ -31,8 +31,6 @@
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub.h"
-#include "vec/sink/load_stream_stub_pool.h"
namespace doris {
class TExpr;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 68506ca161e..a9ac012bf71 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -50,8 +50,8 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/sink/delta_writer_v2_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"
-#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
- if (!_streams_for_node.contains(node)) {
+ if (!_load_stream_map->contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
@@ -111,11 +111,9 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
}
- for (int64_t node_id : new_backends) {
- auto load_streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, _backend_id, node_id, _stream_per_node,
_num_local_sink);
- RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
- _streams_for_node[node_id] = load_streams;
+ for (int64_t dst_id : new_backends) {
+ auto streams = _load_stream_map->get_or_create(dst_id);
+ RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
}
@@ -242,6 +240,8 @@ Status VTabletWriterV2::_init(RuntimeState* state,
RuntimeProfile* profile) {
} else {
_delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
}
+ _load_stream_map =
ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
+ _load_id, _backend_id, _stream_per_node, _num_local_sink);
return Status::OK();
}
@@ -253,23 +253,21 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(_build_tablet_node_mapping());
- RETURN_IF_ERROR(_open_streams(_backend_id));
+ RETURN_IF_ERROR(_open_streams());
RETURN_IF_ERROR(_init_row_distribution());
return Status::OK();
}
-Status VTabletWriterV2::_open_streams(int64_t src_id) {
+Status VTabletWriterV2::_open_streams() {
for (auto& [dst_id, _] : _tablets_for_node) {
- auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+ auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
- _streams_for_node[dst_id] = streams;
}
return Status::OK();
}
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams&
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams&
streams) {
const auto* node_info = _nodes_info->find_node(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
{ node_info = nullptr; });
@@ -278,14 +276,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, LoadStreams& st
}
auto idle_timeout_ms = _state->execution_timeout() * 1000;
// get tablet schema from each backend only in the 1st stream
- for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
+ for (auto& stream : streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema,
tablets_for_schema,
_total_streams, idle_timeout_ms,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
- for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
+ for (auto& stream : streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {},
_total_streams,
idle_timeout_ms,
_state->enable_profile()));
@@ -360,7 +358,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
-
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
+ streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id,
tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
@@ -469,11 +467,13 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
- for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
- stream->cancel(status);
- }
- streams->release();
+ if (_load_stream_map) {
+ _load_stream_map->for_each([status](int64_t dst_id, const Streams&
streams) {
+ for (auto& stream : streams) {
+ stream->cancel(status);
+ }
+ });
+ _load_stream_map->release();
}
return Status::OK();
}
@@ -527,32 +527,29 @@ Status VTabletWriterV2::close(Status exec_status) {
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
- // defer stream release to prevent memory leak
- Defer defer([&] {
- for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
- }
- _streams_for_node.clear();
- });
-
+ // close DeltaWriters
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
- RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
+ auto st = _delta_writer_for_tablet->close(_profile);
_delta_writer_for_tablet.reset();
- }
-
- {
- // send CLOSE_LOAD to all streams, return ERROR if any
- for (const auto& [_, streams] : _streams_for_node) {
- RETURN_IF_ERROR(_close_load(streams->streams()));
+ if (!st.ok()) {
+ RETURN_IF_ERROR(_cancel(st));
}
}
- {
+ _calc_tablets_to_commit();
+ const bool is_last_sink = _load_stream_map->release();
+ LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" <<
is_last_sink
+ << ", load_id=" << print_id(_load_id);
+
+ // send CLOSE_LOAD and close_wait on all streams
+ if (is_last_sink) {
+ RETURN_IF_ERROR(_load_stream_map->close_load());
SCOPED_TIMER(_close_load_timer);
- for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
+ RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t
dst_id,
+ const
Streams& streams) -> Status {
+ for (auto& stream : streams) {
int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 /
1000;
if (remain_ms <= 0) {
@@ -562,52 +559,50 @@ Status VTabletWriterV2::close(Status exec_status) {
}
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
- }
+ return Status::OK();
+ }));
}
- std::unordered_map<int64_t, int> failed_tablets;
+ // calculate and submit commit info
+ if (is_last_sink) {
+ std::unordered_map<int64_t, int> failed_tablets;
+ std::unordered_map<int64_t, Status> failed_reason;
+ std::vector<TTabletCommitInfo> tablet_commit_infos;
- std::vector<TTabletCommitInfo> tablet_commit_infos;
- for (const auto& [node_id, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
+ _load_stream_map->for_each([&](int64_t dst_id, const Streams&
streams) {
std::unordered_set<int64_t> known_tablets;
- for (auto [tablet_id, _] : stream->failed_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
+ for (const auto& stream : streams) {
+ for (auto [tablet_id, reason] : stream->failed_tablets()) {
+ if (known_tablets.contains(tablet_id)) {
+ continue;
+ }
+ known_tablets.insert(tablet_id);
+ failed_tablets[tablet_id]++;
+ failed_reason[tablet_id] = reason;
}
- known_tablets.insert(tablet_id);
- failed_tablets[tablet_id]++;
- }
- for (auto tablet_id : stream->success_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
+ for (auto tablet_id : stream->success_tablets()) {
+ if (known_tablets.contains(tablet_id)) {
+ continue;
+ }
+ known_tablets.insert(tablet_id);
+ TTabletCommitInfo commit_info;
+ commit_info.tabletId = tablet_id;
+ commit_info.backendId = dst_id;
+
tablet_commit_infos.emplace_back(std::move(commit_info));
}
- known_tablets.insert(tablet_id);
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_id;
- commit_info.backendId = node_id;
- tablet_commit_infos.emplace_back(std::move(commit_info));
}
- }
- }
- for (auto [tablet_id, replicas] : failed_tablets) {
- if (replicas <= (_num_replicas - 1) / 2) {
- continue;
- }
- auto backends = _location->find_tablet(tablet_id)->node_ids;
- for (auto& backend_id : backends) {
- for (const auto& stream :
_streams_for_node[backend_id]->streams()) {
- const auto& failed_tablets = stream->failed_tablets();
- if (failed_tablets.contains(tablet_id)) {
- return failed_tablets.at(tablet_id);
- }
+ });
+
+ for (auto [tablet_id, replicas] : failed_tablets) {
+ if (replicas > (_num_replicas - 1) / 2) {
+ return failed_reason.at(tablet_id);
}
}
- DCHECK(false) << "failed tablet " << tablet_id << " should have
failed reason";
+ _state->tablet_commit_infos().insert(
+ _state->tablet_commit_infos().end(),
+ std::make_move_iterator(tablet_commit_infos.begin()),
+ std::make_move_iterator(tablet_commit_infos.end()));
}
-
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
-
std::make_move_iterator(tablet_commit_infos.begin()),
-
std::make_move_iterator(tablet_commit_infos.end()));
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
_state->num_rows_load_filtered() +
@@ -629,18 +624,28 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-Status VTabletWriterV2::_close_load(const Streams& streams) {
- auto node_id = streams[0]->dst_id();
- std::vector<PTabletID> tablets_to_commit;
- for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
- if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
- tablets_to_commit.push_back(tablet);
+void VTabletWriterV2::_calc_tablets_to_commit() {
+ for (const auto& [dst_id, tablets] : _tablets_for_node) {
+ std::vector<PTabletID> tablets_to_commit;
+ std::vector<int64_t> partition_ids;
+ for (const auto& [tablet_id, tablet] : tablets) {
+ if
(_tablet_finder->partition_ids().contains(tablet.partition_id())) {
+ if (VLOG_DEBUG_IS_ON) {
+ partition_ids.push_back(tablet.partition_id());
+ }
+ tablets_to_commit.push_back(tablet);
+ }
}
+ if (VLOG_DEBUG_IS_ON) {
+ std::string msg("close load partitions: ");
+ msg.reserve(partition_ids.size() * 7);
+ for (auto v : partition_ids) {
+ msg.append(std::to_string(v) + ", ");
+ }
+ LOG(WARNING) << msg;
+ }
+ _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
}
- for (const auto& stream : streams) {
- RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
- }
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 460b3acc33f..c04cff15cf4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,7 +69,7 @@
namespace doris {
class DeltaWriterV2;
class LoadStreamStub;
-class LoadStreams;
+class LoadStreamMap;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
@@ -121,9 +121,9 @@ private:
Status _init(RuntimeState* state, RuntimeProfile* profile);
- Status _open_streams(int64_t src_id);
+ Status _open_streams();
- Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams);
+ Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
Status _incremental_open_streams(const std::vector<TOlapTablePartition>&
partitions);
@@ -140,7 +140,7 @@ private:
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t
index_id,
Streams& streams);
- Status _close_load(const Streams& streams);
+ void _calc_tablets_to_commit();
Status _cancel(Status status);
@@ -217,7 +217,7 @@ private:
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::unordered_map<int64_t, std::shared_ptr<LoadStreams>>
_streams_for_node;
+ std::shared_ptr<LoadStreamMap> _load_stream_map;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index 7e5bdd350f5..ad6e496c56f 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,9 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
- MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id, 1) {};
+ MockStreamStub(PUniqueId load_id, int64_t src_id)
+ : LoadStreamStub(load_id, src_id,
std::make_shared<IndexToTabletSchema>(),
+ std::make_shared<IndexToEnableMoW>()) {};
virtual ~MockStreamStub() = default;
diff --git a/be/test/vec/exec/load_stream_stub_map_test.cpp
b/be/test/vec/exec/load_stream_stub_map_test.cpp
new file mode 100644
index 00000000000..5f8743340ac
--- /dev/null
+++ b/be/test/vec/exec/load_stream_stub_map_test.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <gtest/gtest.h>
+
+#include "vec/sink/load_stream_map_pool.h"
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+
+class LoadStreamMapPoolTest : public testing::Test {
+public:
+ LoadStreamMapPoolTest() = default;
+ virtual ~LoadStreamMapPoolTest() = default;
+};
+
+TEST_F(LoadStreamMapPoolTest, test) {
+ LoadStreamMapPool pool;
+ int64_t src_id = 100;
+ PUniqueId load_id;
+ load_id.set_lo(1);
+ load_id.set_hi(2);
+ PUniqueId load_id2;
+ load_id2.set_lo(2);
+ load_id2.set_hi(1);
+ auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2);
+ auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2);
+ EXPECT_EQ(1, pool.size());
+ auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1);
+ EXPECT_EQ(2, pool.size());
+ EXPECT_EQ(streams_for_node1, streams_for_node2);
+ EXPECT_NE(streams_for_node1, streams_for_node3);
+
+ EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size());
+ EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size());
+ EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size());
+
+ EXPECT_TRUE(streams_for_node3->release());
+ EXPECT_EQ(1, pool.size());
+ EXPECT_FALSE(streams_for_node1->release());
+ EXPECT_EQ(1, pool.size());
+ EXPECT_TRUE(streams_for_node2->release());
+ EXPECT_EQ(0, pool.size());
+}
+
+} // namespace doris
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
deleted file mode 100644
index 24da3bb6999..00000000000
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "vec/sink/load_stream_stub_pool.h"
-
-#include <gtest/gtest.h>
-
-#include "vec/sink/load_stream_stub.h"
-
-namespace doris {
-
-class LoadStreamStubPoolTest : public testing::Test {
-public:
- LoadStreamStubPoolTest() = default;
- virtual ~LoadStreamStubPoolTest() = default;
-};
-
-TEST_F(LoadStreamStubPoolTest, test) {
- LoadStreamStubPool pool;
- int64_t src_id = 100;
- PUniqueId load_id;
- load_id.set_hi(1);
- load_id.set_hi(2);
- auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
- auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
- auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
- EXPECT_EQ(2, pool.size());
- EXPECT_EQ(1, pool.templates_size());
- EXPECT_EQ(streams1, streams3);
- EXPECT_NE(streams1, streams2);
- streams1->release();
- streams2->release();
- streams3->release();
- EXPECT_EQ(0, pool.size());
- EXPECT_EQ(0, pool.templates_size());
-}
-
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]