This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 25d7d0b2554 [fix](move-memtable) abstract multi-streams to one logical
stream (#42039) (#42250)
25d7d0b2554 is described below
commit 25d7d0b2554aea3b440c3bb8c58d0837e2740ead
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Oct 22 20:26:42 2024 +0800
[fix](move-memtable) abstract multi-streams to one logical stream (#42039)
(#42250)
backport #42039
---
be/src/vec/sink/load_stream_map_pool.cpp | 50 +++----
be/src/vec/sink/load_stream_map_pool.h | 12 +-
be/src/vec/sink/load_stream_stub.cpp | 66 ++++++++-
be/src/vec/sink/load_stream_stub.h | 69 +++++++++
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 160 ++++++++++-----------
be/src/vec/sink/writer/vtablet_writer_v2.h | 8 +-
be/test/vec/sink/vtablet_writer_v2_test.cpp | 7 +-
.../test_multi_replica_fault_injection.groovy | 2 -
8 files changed, 234 insertions(+), 140 deletions(-)
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
index e8407f4730d..d6dddcc96dc 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t
src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool
incremental) {
+std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id,
bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
- std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
+ std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
return streams;
}
- streams = std::make_shared<Streams>();
- for (int i = 0; i < _num_streams; i++) {
- streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
_tablet_schema_for_index,
- _enable_unique_mow_for_index,
incremental));
- }
+ streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id,
_src_id,
+ _tablet_schema_for_index,
+ _enable_unique_mow_for_index,
incremental);
_streams_for_node[dst_id] = streams;
return streams;
}
-std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
+std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.at(dst_id);
}
@@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) {
return _streams_for_node.contains(dst_id);
}
-void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
+void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)>
fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
@@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function<void(int64_t,
const Streams&)> fn) {
}
}
-Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const
Streams&)> fn) {
+Status LoadStreamMap::for_each_st(std::function<Status(int64_t,
LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
@@ -108,7 +106,10 @@ bool LoadStreamMap::release() {
}
void LoadStreamMap::close_load(bool incremental) {
- auto st = for_each_st([this, incremental](int64_t dst_id, const Streams&
streams) -> Status {
+ for (auto& [dst_id, streams] : _streams_for_node) {
+ if (streams->is_incremental()) {
+ continue;
+ }
std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
tablets_to_commit.reserve(tablets.size());
@@ -116,30 +117,11 @@ void LoadStreamMap::close_load(bool incremental) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
- Status status = Status::OK();
- bool first = true;
- for (auto& stream : streams) {
- if (stream->is_incremental() != incremental) {
- continue;
- }
- if (first) {
- auto st = stream->close_load(tablets_to_commit);
- if (!st.ok() && status.ok()) {
- status = st;
- }
- first = false;
- } else {
- auto st = stream->close_load({});
- if (!st.ok() && status.ok()) {
- status = st;
- }
- }
+ auto st = streams->close_load(tablets_to_commit);
+ if (!st.ok()) {
+ LOG(WARNING) << "close_load for " << (incremental ? "incremental"
: "non-incremental")
+ << " streams failed: " << st << ", load_id=" <<
_load_id;
}
- return status;
- });
- if (!st.ok()) {
- LOG(WARNING) << "close_load for " << (incremental ? "incremental" :
"non-incremental")
- << " streams failed: " << st << ", load_id=" << _load_id;
}
}
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index e5b66aaf9c9..602f1711a94 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -71,22 +71,20 @@ class LoadStreamStub;
class LoadStreamMapPool;
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
LoadStreamMapPool* pool);
- std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental =
false);
+ std::shared_ptr<LoadStreamStubs> get_or_create(int64_t dst_id, bool
incremental = false);
- std::shared_ptr<Streams> at(int64_t dst_id);
+ std::shared_ptr<LoadStreamStubs> at(int64_t dst_id);
bool contains(int64_t dst_id);
- void for_each(std::function<void(int64_t, const Streams&)> fn);
+ void for_each(std::function<void(int64_t, LoadStreamStubs&)> fn);
- Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
+ Status for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn);
void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>&
tablets_to_commit);
@@ -107,7 +105,7 @@ private:
const int _num_streams;
std::atomic<int> _use_cnt;
std::mutex _mutex;
- std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
+ std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>
_streams_for_node;
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 1d13ca4b903..672a0be44f7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -207,7 +207,8 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" <<
node_info.brpc_port
<< ", " << *this;
_is_open.store(true);
- return Status::OK();
+ _status = Status::OK();
+ return _status;
}
// APPEND_DATA
@@ -504,4 +505,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamStub& stub)
return ostr;
}
+Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
+ const NodeInfo& node_info, int64_t txn_id,
+ const OlapTableSchemaParam& schema,
+ const std::vector<PTabletID>& tablets_for_schema,
int total_streams,
+ int64_t idle_timeout_ms, bool enable_profile) {
+ bool get_schema = true;
+ auto status = Status::OK();
+ for (auto& stream : _streams) {
+ Status st;
+ if (get_schema) {
+ st = stream->open(client_cache, node_info, txn_id, schema,
tablets_for_schema,
+ total_streams, idle_timeout_ms, enable_profile);
+ } else {
+ st = stream->open(client_cache, node_info, txn_id, schema, {},
total_streams,
+ idle_timeout_ms, enable_profile);
+ }
+ if (st.ok()) {
+ get_schema = false;
+ } else {
+ LOG(WARNING) << "open stream failed: " << st << "; stream: " <<
*stream;
+ status = st;
+ // no break here to try get schema from the rest streams
+ }
+ }
+ // only mark open when all streams open success
+ _open_success.store(status.ok());
+ // cancel all streams if open failed
+ if (!status.ok()) {
+ cancel(status);
+ }
+ return status;
+}
+
+Status LoadStreamStubs::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
+ if (!_open_success.load()) {
+ return Status::InternalError("streams not open");
+ }
+ bool first = true;
+ auto status = Status::OK();
+ for (auto& stream : _streams) {
+ Status st;
+ if (first) {
+ st = stream->close_load(tablets_to_commit);
+ first = false;
+ } else {
+ st = stream->close_load({});
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "close_load failed: " << st << "; stream: " <<
*stream;
+ }
+ }
+ return status;
+}
+
+Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
+ MonotonicStopWatch watch;
+ watch.start();
+ for (auto& stream : _streams) {
+ RETURN_IF_ERROR(stream->close_wait(state, timeout_ms -
watch.elapsed_time() / 1000 / 1000));
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 223babb42e3..241d7e612ce 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -266,4 +266,73 @@ protected:
bool _is_incremental = false;
};
+// a collection of LoadStreams connect to the same node
+class LoadStreamStubs {
+public:
+ LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
+ std::shared_ptr<IndexToTabletSchema> schema_map,
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental = false)
+ : _is_incremental(incremental) {
+ _streams.reserve(num_streams);
+ for (size_t i = 0; i < num_streams; i++) {
+ _streams.emplace_back(
+ new LoadStreamStub(load_id, src_id, schema_map, mow_map,
incremental));
+ }
+ }
+
+ Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
+ int64_t txn_id, const OlapTableSchemaParam& schema,
+ const std::vector<PTabletID>& tablets_for_schema, int
total_streams,
+ int64_t idle_timeout_ms, bool enable_profile);
+
+ bool is_incremental() const { return _is_incremental; }
+
+ size_t size() const { return _streams.size(); }
+
+ // for UT only
+ void mark_open() { _open_success.store(true); }
+
+ std::shared_ptr<LoadStreamStub> select_one_stream() {
+ if (!_open_success.load()) {
+ return nullptr;
+ }
+ size_t i = _select_index.fetch_add(1);
+ return _streams[i % _streams.size()];
+ }
+
+ void cancel(Status reason) {
+ for (auto& stream : _streams) {
+ stream->cancel(reason);
+ }
+ }
+
+ Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+
+ Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+
+ std::unordered_set<int64_t> success_tablets() {
+ std::unordered_set<int64_t> s;
+ for (auto& stream : _streams) {
+ auto v = stream->success_tablets();
+ std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
+ }
+ return s;
+ }
+
+ std::unordered_map<int64_t, Status> failed_tablets() {
+ std::unordered_map<int64_t, Status> m;
+ for (auto& stream : _streams) {
+ auto v = stream->failed_tablets();
+ m.insert(v.begin(), v.end());
+ }
+ return m;
+ }
+
+private:
+ std::vector<std::shared_ptr<LoadStreamStub>> _streams;
+ std::atomic<bool> _open_success = false;
+ std::atomic<size_t> _select_index = 0;
+ const bool _is_incremental;
+};
+
} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 16c11b1cf42..c693e20c3a8 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -269,6 +269,8 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
Status VTabletWriterV2::_open_streams() {
bool fault_injection_skip_be = true;
+ bool any_backend = false;
+ bool any_success = false;
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = _load_stream_map->get_or_create(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
@@ -277,12 +279,17 @@ Status VTabletWriterV2::_open_streams() {
continue;
}
});
- RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
+ auto st = _open_streams_to_backend(dst_id, *streams);
+ any_backend = true;
+ any_success = any_success || st.ok();
+ }
+ if (any_backend && !any_success) {
+ return Status::InternalError("failed to open streams to any BE");
}
return Status::OK();
}
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams&
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id,
LoadStreamStubs& streams) {
const auto* node_info = _nodes_info->find_node(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
{ node_info = nullptr; });
@@ -293,26 +300,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, Streams& stream
std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
{ tablets_for_schema.clear(); });
- int fault_injection_skip_cnt = 0;
- for (auto& stream : streams) {
-
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure",
{
- if (fault_injection_skip_cnt < 1) {
- fault_injection_skip_cnt++;
- continue;
- }
- });
- auto st =
stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info,
- _txn_id, *_schema, tablets_for_schema,
_total_streams,
- idle_timeout_ms, _state->enable_profile());
- if (st.ok()) {
- // get tablet schema from each backend only in the 1st stream
- tablets_for_schema.clear();
- } else {
- LOG(WARNING) << "failed to open stream to backend " << dst_id
- << ", load_id=" << print_id(_load_id);
- }
+ auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(),
*node_info, _txn_id,
+ *_schema, tablets_for_schema, _total_streams,
idle_timeout_ms,
+ _state->enable_profile());
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to open stream to backend " << dst_id
+ << ", load_id=" << print_id(_load_id) << ", err=" << st;
}
- return Status::OK();
+ return st;
}
Status VTabletWriterV2::_build_tablet_node_mapping() {
@@ -375,7 +370,7 @@ void
VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
}
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t
partition_id, int64_t index_id,
- Streams& streams) {
+
std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
const auto* location = _location->find_tablet(tablet_id);
DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", {
location = nullptr; });
if (location == nullptr) {
@@ -388,13 +383,16 @@ Status VTabletWriterV2::_select_streams(int64_t
tablet_id, int64_t partition_id,
tablet.set_tablet_id(tablet_id);
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id,
index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
- auto stream = _load_stream_map->at(node_id)->at(_stream_index);
- for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
- stream = _load_stream_map->at(node_id)->at((_stream_index + i) %
_stream_per_node);
+ auto stream = _load_stream_map->at(node_id)->select_one_stream();
+ if (stream == nullptr) {
+ continue;
}
streams.emplace_back(std::move(stream));
}
- _stream_index = (_stream_index + 1) % _stream_per_node;
+ if (streams.size() <= location->node_ids.size() / 2) {
+ return Status::InternalError("not enough streams {}/{}",
streams.size(),
+ location->node_ids.size());
+ }
Status st;
for (auto& stream : streams) {
st = stream->wait_for_schema(partition_id, index_id, tablet_id);
@@ -458,9 +456,10 @@ Status VTabletWriterV2::write(Block& input_block) {
Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block>
block, int64_t tablet_id,
const Rows& rows) {
+ auto st = Status::OK();
auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id,
[&]() {
- Streams streams;
- auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id,
streams);
+ std::vector<std::shared_ptr<LoadStreamStub>> streams;
+ st = _select_streams(tablet_id, rows.partition_id, rows.index_id,
streams);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "select stream failed, " << st << ", load_id=" <<
print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
@@ -487,7 +486,8 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
{ index_not_found = true; });
- if (index_not_found) {
+ if (index_not_found) [[unlikely]] {
+ st = Status::InternalError("no index {} in schema", rows.index_id);
LOG(WARNING) << "index " << rows.index_id
<< " not found in schema, load_id=" <<
print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
@@ -496,15 +496,15 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
- << ", load_id=" << print_id(_load_id);
- return Status::InternalError("failed to open DeltaWriter for tablet
{}", tablet_id);
+ << ", load_id=" << print_id(_load_id) << ", err: " << st;
+ return Status::InternalError("failed to open DeltaWriter {}: {}",
tablet_id, st.msg());
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
SCOPED_TIMER(_write_memtable_timer);
- auto st = delta_writer->write(block.get(), rows.row_idxes);
+ st = delta_writer->write(block.get(), rows.row_idxes);
return st;
}
@@ -517,11 +517,8 @@ void VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet.reset();
}
if (_load_stream_map) {
- _load_stream_map->for_each([status](int64_t dst_id, const Streams&
streams) {
- for (auto& stream : streams) {
- stream->cancel(status);
- }
- });
+ _load_stream_map->for_each(
+ [status](int64_t dst_id, LoadStreamStubs& streams) {
streams.cancel(status); });
_load_stream_map->release();
}
}
@@ -624,17 +621,14 @@ Status VTabletWriterV2::close(Status exec_status) {
DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
auto streams =
_load_stream_map->at(_tablets_for_node.begin()->first);
int64_t tablet_id = -1;
- for (auto& stream : *streams) {
- const auto& tablets = stream->success_tablets();
- if (tablets.size() > 0) {
- tablet_id = tablets[0];
- break;
- }
+ for (auto tablet : streams->success_tablets()) {
+ tablet_id = tablet;
+ break;
}
if (tablet_id != -1) {
LOG(INFO) << "fault injection: adding failed tablet_id: "
<< tablet_id;
- streams->front()->add_failed_tablet(tablet_id,
-
Status::InternalError("fault injection"));
+ streams->select_one_stream()->add_failed_tablet(
+ tablet_id, Status::InternalError("fault
injection"));
} else {
LOG(INFO) << "fault injection: failed to inject failed
tablet_id";
}
@@ -672,26 +666,24 @@ Status VTabletWriterV2::close(Status exec_status) {
void VTabletWriterV2::_close_wait(bool incremental) {
SCOPED_TIMER(_close_load_timer);
auto st = _load_stream_map->for_each_st(
- [this, incremental](int64_t dst_id, const Streams& streams) ->
Status {
- Status status = Status::OK();
- for (auto& stream : streams) {
- if (stream->is_incremental() != incremental) {
- continue;
- }
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
-
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- auto st = stream->close_wait(_state, remain_ms);
- if (!st.ok() && status.ok()) {
- status = st;
- }
+ [this, incremental](int64_t dst_id, LoadStreamStubs& streams) ->
Status {
+ if (streams.is_incremental() != incremental) {
+ return Status::OK();
+ }
+ int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", {
remain_ms = 0; });
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting,
load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before close
waiting");
}
- return status;
+ auto st = streams.close_wait(_state, remain_ms);
+ if (!st.ok()) {
+ LOG(WARNING) << "close_wait timeout on streams to dst_id="
<< dst_id
+ << ", load_id=" << print_id(_load_id) << ": "
<< st;
+ }
+ return st;
});
if (!st.ok()) {
LOG(WARNING) << "close_wait failed: " << st << ", load_id=" <<
print_id(_load_id);
@@ -730,31 +722,23 @@ Status
VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl
int num_replicas) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
- load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
- std::unordered_set<int64_t> known_tablets;
- for (const auto& stream : streams) {
- LOG(INFO) << "stream " << stream->stream_id()
- << " success tablets: " <<
stream->success_tablets().size()
- << ", failed tablets: " <<
stream->failed_tablets().size();
- for (auto [tablet_id, reason] : stream->failed_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
- }
- known_tablets.insert(tablet_id);
- failed_tablets[tablet_id]++;
- failed_reason[tablet_id] = reason;
- }
- for (auto tablet_id : stream->success_tablets()) {
- if (known_tablets.contains(tablet_id)) {
- continue;
- }
- known_tablets.insert(tablet_id);
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_id;
- commit_info.backendId = dst_id;
- tablet_commit_infos.emplace_back(std::move(commit_info));
- }
+ load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
+ size_t num_success_tablets = 0;
+ size_t num_failed_tablets = 0;
+ for (auto [tablet_id, reason] : streams.failed_tablets()) {
+ failed_tablets[tablet_id]++;
+ failed_reason[tablet_id] = reason;
+ num_failed_tablets++;
+ }
+ for (auto tablet_id : streams.success_tablets()) {
+ TTabletCommitInfo commit_info;
+ commit_info.tabletId = tablet_id;
+ commit_info.backendId = dst_id;
+ tablet_commit_infos.emplace_back(std::move(commit_info));
+ num_success_tablets++;
}
+ LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: "
<< num_success_tablets
+ << ", failed tablets: " << num_failed_tablets;
});
for (auto [tablet_id, replicas] : failed_tablets) {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index f65e0c8f3cd..b50044ede93 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,6 +69,7 @@
namespace doris {
class DeltaWriterV2;
class LoadStreamStub;
+class LoadStreamStubs;
class LoadStreamMap;
class ObjectPool;
class RowDescriptor;
@@ -85,8 +86,6 @@ class OlapTabletFinder;
class VTabletWriterV2;
class DeltaWriterV2Map;
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
struct Rows {
int64_t partition_id;
int64_t index_id;
@@ -128,7 +127,7 @@ private:
Status _open_streams();
- Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
+ Status _open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams);
Status _incremental_open_streams(const std::vector<TOlapTablePartition>&
partitions);
@@ -143,7 +142,7 @@ private:
const Rows& rows);
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t
index_id,
- Streams& streams);
+ std::vector<std::shared_ptr<LoadStreamStub>>&
streams);
void _calc_tablets_to_commit();
@@ -226,7 +225,6 @@ private:
std::shared_ptr<LoadStreamMap> _load_stream_map;
- size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
VRowDistribution _row_distribution;
diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp
b/be/test/vec/sink/vtablet_writer_v2_test.cpp
index 6289896c75f..67dc9d089ab 100644
--- a/be/test/vec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp
@@ -37,12 +37,13 @@ const int64_t src_id = 1000;
static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t
node_id,
std::vector<int64_t> success_tablets,
std::unordered_map<int64_t, Status> failed_tablets) {
- auto stub = load_stream_map->get_or_create(node_id);
+ auto streams = load_stream_map->get_or_create(node_id);
+ streams->mark_open();
for (const auto& tablet_id : success_tablets) {
- stub->at(0)->add_success_tablet(tablet_id);
+ streams->select_one_stream()->add_success_tablet(tablet_id);
}
for (const auto& [tablet_id, reason] : failed_tablets) {
- stub->at(0)->add_failed_tablet(tablet_id, reason);
+ streams->select_one_stream()->add_failed_tablet(tablet_id, reason);
}
}
diff --git
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 33f7e28dbc9..2f6afd5ca69 100644
---
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -97,8 +97,6 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
"failed to send segment data to any replicas")
// test segment num check when LoadStreamStub missed tail segments
load_with_injection("LoadStreamStub.only_send_segment_0", "segment num
mismatch")
- // test 1st stream to each backend failure
-
load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure",
"success")
// test one backend open failure
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend",
"success")
sql """ set enable_memtable_on_sink_node=false """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]