This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c34b52c02a0 [fix](move-memtable) fix initial use count of streams for
auto partition (#33165)
c34b52c02a0 is described below
commit c34b52c02a0547b665496bc02d775017d67d7fb3
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 3 19:44:52 2024 +0800
[fix](move-memtable) fix initial use count of streams for auto partition
(#33165)
---
be/src/vec/sink/load_stream_stub.cpp | 33 +---
be/src/vec/sink/load_stream_stub.h | 14 +-
be/src/vec/sink/load_stream_stub_pool.cpp | 110 +++++++++----
be/src/vec/sink/load_stream_stub_pool.h | 50 +++---
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 200 ++++++++++++------------
be/src/vec/sink/writer/vtablet_writer_v2.h | 10 +-
be/test/io/fs/stream_sink_file_writer_test.cpp | 4 +-
be/test/vec/exec/load_stream_stub_pool_test.cpp | 31 ++--
8 files changed, 254 insertions(+), 198 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 1b19cd1dd52..2e118dce5c1 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -125,19 +125,13 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler
return ostr;
}
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
- : _use_cnt(num_use),
- _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map)
+ : _load_id(load_id),
_src_id(src_id),
- _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
- _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>())
{};
-
-LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
- : _use_cnt(stub._use_cnt.load()),
- _load_id(stub._load_id),
- _src_id(stub._src_id),
- _tablet_schema_for_index(stub._tablet_schema_for_index),
- _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
+ _tablet_schema_for_index(schema_map),
+ _enable_unique_mow_for_index(mow_map) {};
LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
@@ -241,23 +235,12 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
- {
- std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
- _tablets_to_commit.insert(_tablets_to_commit.end(),
tablets_to_commit.begin(),
- tablets_to_commit.end());
- }
- if (--_use_cnt > 0) {
- return Status::OK();
- }
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
- {
- std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
- for (const auto& tablet : _tablets_to_commit) {
- *header.add_tablets() = tablet;
- }
+ for (const auto& tablet : tablets_to_commit) {
+ *header.add_tablets() = tablet;
}
return _encode_and_send(header);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index f20b0e6ea3d..aa8b850760e 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -109,10 +109,14 @@ class LoadStreamStub {
public:
// construct new stub
- LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
+ LoadStreamStub(PUniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map);
- // copy constructor, shared_ptr members are shared
- LoadStreamStub(LoadStreamStub& stub);
+ LoadStreamStub(UniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map)
+ : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map)
{};
// for mock this class in UT
#ifdef BE_TEST
@@ -213,7 +217,6 @@ protected:
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;
- std::atomic<int> _use_cnt;
PUniqueId _load_id;
brpc::StreamId _stream_id;
@@ -226,9 +229,6 @@ protected:
bthread::Mutex _cancel_mutex;
bthread::ConditionVariable _close_cv;
- std::mutex _tablets_to_commit_mutex;
- std::vector<PTabletID> _tablets_to_commit;
-
std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index d76402b57d5..3eae49aff77 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -23,50 +23,104 @@
namespace doris {
class TExpr;
-LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool)
- : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
+LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int
num_streams, int num_use,
+ LoadStreamStubPool* pool)
+ : _load_id(load_id),
+ _src_id(src_id),
+ _num_streams(num_streams),
+ _use_cnt(num_use),
+ _pool(pool) {
+ DCHECK(num_streams > 0) << "stream num should be greater than 0";
+ DCHECK(num_use > 0) << "use num should be greater than 0";
+}
+
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
+ if (streams != nullptr) {
+ return streams;
+ }
+ streams = std::make_shared<Streams>();
+ auto schema_map = std::make_shared<IndexToTabletSchema>();
+ auto mow_map = std::make_shared<IndexToEnableMoW>();
+ for (int i = 0; i < _num_streams; i++) {
+ streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
schema_map, mow_map));
+ }
+ _streams_for_node[dst_id] = streams;
+ return streams;
+}
+
+std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _streams_for_node.at(dst_id);
+}
+
+bool LoadStreamMap::contains(int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _streams_for_node.contains(dst_id);
+}
-void LoadStreams::release() {
+void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ for (auto& [dst_id, streams] : _streams_for_node) {
+ fn(dst_id, *streams);
+ }
+}
+
+Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const
Streams&)> fn) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ for (auto& [dst_id, streams] : _streams_for_node) {
+ RETURN_IF_ERROR(fn(dst_id, *streams));
+ }
+ return Status::OK();
+}
+
+void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
+ const std::vector<PTabletID>&
tablets_to_commit) {
+ std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+ auto& tablets = _tablets_to_commit[dst_id];
+ tablets.insert(tablets.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
+}
+
+bool LoadStreamMap::release() {
int num_use = --_use_cnt;
- DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
- LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id="
<< _dst_id;
- _pool->erase(_load_id, _dst_id);
- } else {
- LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" <<
_dst_id
- << ", use_cnt=" << num_use;
+ LOG(INFO) << "releasing streams, load_id=" << _load_id;
+ _pool->erase(_load_id);
+ return true;
}
+ LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" <<
num_use;
+ return false;
+}
+
+Status LoadStreamMap::close_load() {
+ return for_each_st([this](int64_t dst_id, const Streams& streams) ->
Status {
+ const auto& tablets = _tablets_to_commit[dst_id];
+ for (auto& stream : streams) {
+ RETURN_IF_ERROR(stream->close_load(tablets));
+ }
+ return Status::OK();
+ });
}
LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
-
-std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId
load_id, int64_t src_id,
- int64_t dst_id,
int num_streams,
- int num_sink) {
- auto key = std::make_pair(UniqueId(load_id), dst_id);
+std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId
load_id, int64_t src_id,
+ int
num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
- std::shared_ptr<LoadStreams> streams = _pool[key];
- if (streams) {
+ std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
+ if (streams != nullptr) {
return streams;
}
- DCHECK(num_streams > 0) << "stream num should be greater than 0";
- DCHECK(num_sink > 0) << "sink num should be greater than 0";
- auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id, num_sink});
- streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
- for (int32_t i = 0; i < num_streams; i++) {
- // copy construct, internal tablet schema map will be shared among all
stubs
- streams->streams().emplace_back(new LoadStreamStub {*it->second});
- }
- _pool[key] = streams;
+ streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams,
num_use, this);
+ _pool[load_id] = streams;
return streams;
}
-void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
+void LoadStreamStubPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
- _pool.erase(std::make_pair(load_id, dst_id));
- _template_stubs.erase(load_id);
+ _pool.erase(load_id);
}
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
index 662fc5bc1a1..65f3bb66cd2 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -72,20 +72,41 @@ class LoadStreamStubPool;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-class LoadStreams {
+class LoadStreamMap {
public:
- LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool);
+ LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
+ LoadStreamStubPool* pool);
- void release();
+ std::shared_ptr<Streams> get_or_create(int64_t dst_id);
- Streams& streams() { return _streams; }
+ std::shared_ptr<Streams> at(int64_t dst_id);
+
+ bool contains(int64_t dst_id);
+
+ void for_each(std::function<void(int64_t, const Streams&)> fn);
+
+ Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
+
+ void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>&
tablets_to_commit);
+
+ // Return true if the last instance is just released.
+ bool release();
+
+ // send CLOSE_LOAD to all streams, return ERROR if any.
+ // only call this method after release() returns true.
+ Status close_load();
private:
- Streams _streams;
- UniqueId _load_id;
- int64_t _dst_id;
+ const UniqueId _load_id;
+ const int64_t _src_id;
+ const int _num_streams;
std::atomic<int> _use_cnt;
+ std::mutex _mutex;
+ std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
LoadStreamStubPool* _pool = nullptr;
+
+ std::mutex _tablets_to_commit_mutex;
+ std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};
class LoadStreamStubPool {
@@ -94,26 +115,19 @@ public:
~LoadStreamStubPool();
- std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t
src_id, int64_t dst_id,
- int num_streams, int num_sink);
+ std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t
src_id, int num_streams,
+ int num_use);
- void erase(UniqueId load_id, int64_t dst_id);
+ void erase(UniqueId load_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _pool.size();
}
- // for UT only
- size_t templates_size() {
- std::lock_guard<std::mutex> lock(_mutex);
- return _template_stubs.size();
- }
-
private:
std::mutex _mutex;
- std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>>
_template_stubs;
- std::unordered_map<std::pair<UniqueId, int64_t>,
std::shared_ptr<LoadStreams>> _pool;
+ std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool;
};
} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 52a2f75e263..42594f13d99 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
- if (!_streams_for_node.contains(node)) {
+ if (!_streams_for_node->contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
@@ -111,11 +111,9 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
}
- for (int64_t node_id : new_backends) {
- auto load_streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, _backend_id, node_id, _stream_per_node,
_num_local_sink);
- RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
- _streams_for_node[node_id] = load_streams;
+ for (int64_t dst_id : new_backends) {
+ auto streams = _streams_for_node->get_or_create(dst_id);
+ RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
}
@@ -244,6 +242,8 @@ Status VTabletWriterV2::_init(RuntimeState* state,
RuntimeProfile* profile) {
} else {
_delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
}
+ _streams_for_node =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+ _load_id, _backend_id, _stream_per_node, _num_local_sink);
return Status::OK();
}
@@ -255,23 +255,21 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(_build_tablet_node_mapping());
- RETURN_IF_ERROR(_open_streams(_backend_id));
+ RETURN_IF_ERROR(_open_streams());
RETURN_IF_ERROR(_init_row_distribution());
return Status::OK();
}
-Status VTabletWriterV2::_open_streams(int64_t src_id) {
+Status VTabletWriterV2::_open_streams() {
for (auto& [dst_id, _] : _tablets_for_node) {
- auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+ auto streams = _streams_for_node->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
- _streams_for_node[dst_id] = streams;
}
return Status::OK();
}
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams&
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams&
streams) {
const auto* node_info = _nodes_info->find_node(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
{ node_info = nullptr; });
@@ -280,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, LoadStreams& st
}
auto idle_timeout_ms = _state->execution_timeout() * 1000;
// get tablet schema from each backend only in the 1st stream
- for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
+ for (auto& stream : streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema,
tablets_for_schema,
_total_streams, idle_timeout_ms,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
- for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
+ for (auto& stream : streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {},
_total_streams,
idle_timeout_ms,
_state->enable_profile()));
@@ -363,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
tablet.set_tablet_id(tablet_id);
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id,
index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
-
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
+
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id,
tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
@@ -472,11 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
- for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
- stream->cancel(status);
- }
- streams->release();
+ if (_streams_for_node) {
+ _streams_for_node->for_each([status](int64_t dst_id, const Streams&
streams) {
+ for (auto& stream : streams) {
+ stream->cancel(status);
+ }
+ });
+ _streams_for_node->release();
}
return Status::OK();
}
@@ -530,87 +530,83 @@ Status VTabletWriterV2::close(Status exec_status) {
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
- // defer stream release to prevent memory leak
- Defer defer([&] {
- for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
- }
- _streams_for_node.clear();
- });
-
+ // close DeltaWriters
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
- RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
+ auto st = _delta_writer_for_tablet->close(_profile);
_delta_writer_for_tablet.reset();
- }
-
- {
- // send CLOSE_LOAD to all streams, return ERROR if any
- for (const auto& [_, streams] : _streams_for_node) {
- RETURN_IF_ERROR(_close_load(streams->streams()));
+ if (!st.ok()) {
+ RETURN_IF_ERROR(_cancel(st));
}
}
- {
+ _calc_tablets_to_commit();
+ const bool is_last_sink = _streams_for_node->release();
+ LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" <<
is_last_sink
+ << ", load_id=" << print_id(_load_id);
+
+ // send CLOSE_LOAD and close_wait on all streams
+ if (is_last_sink) {
+ RETURN_IF_ERROR(_streams_for_node->close_load());
SCOPED_TIMER(_close_load_timer);
- for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
- }
- }
+ RETURN_IF_ERROR(_streams_for_node->for_each_st(
+ [this](int64_t dst_id, const Streams& streams) -> Status {
+ for (auto& stream : streams) {
+ int64_t remain_ms =
+
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close
waiting, load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before
close waiting");
+ }
+ RETURN_IF_ERROR(stream->close_wait(_state,
remain_ms));
+ }
+ return Status::OK();
+ }));
}
- std::unordered_map<int64_t, int> failed_tablets;
+ // calculate and submit commit info
+ if (is_last_sink) {
+ std::unordered_map<int64_t, int> failed_tablets;
+ std::unordered_map<int64_t, Status> failed_reason;
+ std::vector<TTabletCommitInfo> tablet_commit_infos;
- std::vector<TTabletCommitInfo> tablet_commit_infos;
- for (const auto& [node_id, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
+ _streams_for_node->for_each([&](int64_t dst_id, const Streams&
streams) {
std::unordered_set<int64_t> known_tablets;
- for (auto [tablet_id, _] : stream->failed_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
+ for (const auto& stream : streams) {
+ for (auto [tablet_id, reason] : stream->failed_tablets()) {
+ if (known_tablets.contains(tablet_id)) {
+ continue;
+ }
+ known_tablets.insert(tablet_id);
+ failed_tablets[tablet_id]++;
+ failed_reason[tablet_id] = reason;
}
- known_tablets.insert(tablet_id);
- failed_tablets[tablet_id]++;
- }
- for (auto tablet_id : stream->success_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
+ for (auto tablet_id : stream->success_tablets()) {
+ if (known_tablets.contains(tablet_id)) {
+ continue;
+ }
+ known_tablets.insert(tablet_id);
+ TTabletCommitInfo commit_info;
+ commit_info.tabletId = tablet_id;
+ commit_info.backendId = dst_id;
+
tablet_commit_infos.emplace_back(std::move(commit_info));
}
- known_tablets.insert(tablet_id);
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_id;
- commit_info.backendId = node_id;
- tablet_commit_infos.emplace_back(std::move(commit_info));
}
- }
- }
- for (auto [tablet_id, replicas] : failed_tablets) {
- if (replicas <= (_num_replicas - 1) / 2) {
- continue;
- }
- auto backends = _location->find_tablet(tablet_id)->node_ids;
- for (auto& backend_id : backends) {
- for (const auto& stream :
_streams_for_node[backend_id]->streams()) {
- const auto& failed_tablets = stream->failed_tablets();
- if (failed_tablets.contains(tablet_id)) {
- return failed_tablets.at(tablet_id);
- }
+ });
+
+ for (auto [tablet_id, replicas] : failed_tablets) {
+ if (replicas > (_num_replicas - 1) / 2) {
+ return failed_reason.at(tablet_id);
}
}
- DCHECK(false) << "failed tablet " << tablet_id << " should have
failed reason";
+ _state->tablet_commit_infos().insert(
+ _state->tablet_commit_infos().end(),
+ std::make_move_iterator(tablet_commit_infos.begin()),
+ std::make_move_iterator(tablet_commit_infos.end()));
}
-
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
-
std::make_move_iterator(tablet_commit_infos.begin()),
-
std::make_move_iterator(tablet_commit_infos.end()));
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
_state->num_rows_load_filtered() +
@@ -632,30 +628,28 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-Status VTabletWriterV2::_close_load(const Streams& streams) {
- auto node_id = streams[0]->dst_id();
- std::vector<PTabletID> tablets_to_commit;
- std::vector<int64_t> partition_ids;
- for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
- if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
- if (VLOG_DEBUG_IS_ON) {
- partition_ids.push_back(tablet.partition_id());
+void VTabletWriterV2::_calc_tablets_to_commit() {
+ for (const auto& [dst_id, tablets] : _tablets_for_node) {
+ std::vector<PTabletID> tablets_to_commit;
+ std::vector<int64_t> partition_ids;
+ for (const auto& [tablet_id, tablet] : tablets) {
+ if
(_tablet_finder->partition_ids().contains(tablet.partition_id())) {
+ if (VLOG_DEBUG_IS_ON) {
+ partition_ids.push_back(tablet.partition_id());
+ }
+ tablets_to_commit.push_back(tablet);
}
- tablets_to_commit.push_back(tablet);
}
- }
- if (VLOG_DEBUG_IS_ON) {
- std::string msg("close load partitions: ");
- msg.reserve(partition_ids.size() * 7);
- for (auto v : partition_ids) {
- msg.append(std::to_string(v) + ", ");
+ if (VLOG_DEBUG_IS_ON) {
+ std::string msg("close load partitions: ");
+ msg.reserve(partition_ids.size() * 7);
+ for (auto v : partition_ids) {
+ msg.append(std::to_string(v) + ", ");
+ }
+ LOG(WARNING) << msg;
}
- LOG(WARNING) << msg;
- }
- for (const auto& stream : streams) {
- RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+ _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
}
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 460b3acc33f..7785733bf4a 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,7 +69,7 @@
namespace doris {
class DeltaWriterV2;
class LoadStreamStub;
-class LoadStreams;
+class LoadStreamMap;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
@@ -121,9 +121,9 @@ private:
Status _init(RuntimeState* state, RuntimeProfile* profile);
- Status _open_streams(int64_t src_id);
+ Status _open_streams();
- Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams);
+ Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
Status _incremental_open_streams(const std::vector<TOlapTablePartition>&
partitions);
@@ -140,7 +140,7 @@ private:
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t
index_id,
Streams& streams);
- Status _close_load(const Streams& streams);
+ void _calc_tablets_to_commit();
Status _cancel(Status status);
@@ -217,7 +217,7 @@ private:
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::unordered_map<int64_t, std::shared_ptr<LoadStreams>>
_streams_for_node;
+ std::shared_ptr<LoadStreamMap> _streams_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index 7e5bdd350f5..ad6e496c56f 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,9 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
- MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id, 1) {};
+ MockStreamStub(PUniqueId load_id, int64_t src_id)
+ : LoadStreamStub(load_id, src_id,
std::make_shared<IndexToTabletSchema>(),
+ std::make_shared<IndexToEnableMoW>()) {};
virtual ~MockStreamStub() = default;
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 24da3bb6999..e576db3bdaa 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,20 +32,29 @@ TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
int64_t src_id = 100;
PUniqueId load_id;
- load_id.set_hi(1);
+ load_id.set_lo(1);
load_id.set_hi(2);
- auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
- auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
- auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+ PUniqueId load_id2;
+ load_id2.set_lo(2);
+ load_id2.set_hi(1);
+ auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2);
+ auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2);
+ EXPECT_EQ(1, pool.size());
+ auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1);
EXPECT_EQ(2, pool.size());
- EXPECT_EQ(1, pool.templates_size());
- EXPECT_EQ(streams1, streams3);
- EXPECT_NE(streams1, streams2);
- streams1->release();
- streams2->release();
- streams3->release();
+ EXPECT_EQ(streams_for_node1, streams_for_node2);
+ EXPECT_NE(streams_for_node1, streams_for_node3);
+
+ EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size());
+ EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size());
+ EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size());
+
+ EXPECT_TRUE(streams_for_node3->release());
+ EXPECT_EQ(1, pool.size());
+ EXPECT_FALSE(streams_for_node1->release());
+ EXPECT_EQ(1, pool.size());
+ EXPECT_TRUE(streams_for_node2->release());
EXPECT_EQ(0, pool.size());
- EXPECT_EQ(0, pool.templates_size());
}
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]