This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 794cf768bc3 branch-3.1: [fix](load) quorum success write should
consider data skew #53737 (#53827)
794cf768bc3 is described below
commit 794cf768bc3f292264dc9e0fb717c3018f794565
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 25 14:06:18 2025 +0800
branch-3.1: [fix](load) quorum success write should consider data skew
#53737 (#53827)
Cherry-picked from #53737
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/load_stream_stub.cpp | 2 --
be/src/vec/sink/load_stream_stub.h | 11 ------
be/src/vec/sink/writer/vtablet_writer.cpp | 37 +++++++++----------
be/src/vec/sink/writer/vtablet_writer.h | 8 +----
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 54 ++++++++++++++--------------
be/src/vec/sink/writer/vtablet_writer_v2.h | 6 ++--
6 files changed, 51 insertions(+), 67 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 00c327a21d2..44b09e935eb 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -229,7 +229,6 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
header.set_offset(offset);
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
header.set_file_type(file_type);
- add_write_tablets(tablet_id);
return _encode_and_send(header, data);
}
@@ -254,7 +253,6 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
if (flush_schema != nullptr) {
flush_schema->to_schema_pb(header.mutable_flush_schema());
}
- add_write_tablets(tablet_id);
return _encode_and_send(header);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 4a2d843a7a1..408d9fede54 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -223,16 +223,6 @@ public:
return _bytes_written;
}
- void add_write_tablets(int64_t tablet_id) {
- std::lock_guard<bthread::Mutex> lock(_write_mutex);
- _write_tablets.insert(tablet_id);
- }
-
- std::set<int64_t> write_tablets() {
- std::lock_guard<bthread::Mutex> lock(_write_mutex);
- return _write_tablets;
- }
-
Status check_cancel() {
DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
{ return Status::InternalError("stream cancelled"); });
@@ -285,7 +275,6 @@ protected:
bool _is_incremental = false;
bthread::Mutex _write_mutex;
- std::set<int64_t> _write_tablets;
size_t _bytes_written = 0;
};
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index e1d1505cc66..94c4a33e73e 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -316,18 +316,24 @@ Status IndexChannel::close_wait(
{ return Status::TimedOut("injected timeout"); });
Status status = Status::OK();
// 1. wait quorum success
- std::unordered_set<int64_t> write_tablets;
- for (const auto& [node_id, node_channel] : _node_channels) {
- auto node_channel_write_tablets = node_channel->write_tablets();
- write_tablets.insert(node_channel_write_tablets.begin(),
node_channel_write_tablets.end());
+ std::unordered_set<int64_t> need_finish_tablets;
+ auto partition_ids = _parent->_tablet_finder->partition_ids();
+ for (const auto& part : _parent->_vpartition->get_partitions()) {
+ if (partition_ids.contains(part->id)) {
+ for (const auto& index : part->indexes) {
+ for (const auto& tablet_id : index.tablets) {
+ need_finish_tablets.insert(tablet_id);
+ }
+ }
+ }
}
while (true) {
RETURN_IF_ERROR(check_each_node_channel_close(
&unfinished_node_channel_ids, node_add_batch_counter_map,
writer_stats, status));
- bool quorum_success = _quorum_success(unfinished_node_channel_ids,
write_tablets);
+ bool quorum_success = _quorum_success(unfinished_node_channel_ids,
need_finish_tablets);
if (unfinished_node_channel_ids.empty() || quorum_success) {
LOG(INFO) << "quorum_success: " << quorum_success
- << ", is all unfinished: " <<
unfinished_node_channel_ids.empty()
+ << ", is all finished: " <<
unfinished_node_channel_ids.empty()
<< ", txn_id: " << _parent->_txn_id
<< ", load_id: " << print_id(_parent->_load_id);
break;
@@ -399,28 +405,27 @@ Status IndexChannel::check_each_node_channel_close(
}
bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>&
unfinished_node_channel_ids,
- const std::unordered_set<int64_t>&
write_tablets) {
+ const std::unordered_set<int64_t>&
need_finish_tablets) {
if (!config::enable_quorum_success_write) {
return false;
}
- std::unordered_map<int64_t, int64_t> finished_tablets_replica;
+ if (need_finish_tablets.empty()) [[unlikely]] {
+ return false;
+ }
// 1. collect all write tablets and finished tablets
+ std::unordered_map<int64_t, int64_t> finished_tablets_replica;
for (const auto& [node_id, node_channel] : _node_channels) {
- auto node_channel_write_tablets = node_channel->write_tablets();
if (unfinished_node_channel_ids.contains(node_id) ||
!node_channel->check_status().ok()) {
continue;
}
- for (const auto& tablet_id : node_channel_write_tablets) {
+ for (const auto& tablet_id : _tablets_by_channel[node_id]) {
finished_tablets_replica[tablet_id]++;
}
}
// 2. check if quorum success
- if (write_tablets.empty()) {
- return false;
- }
- for (const auto& tablet_id : write_tablets) {
+ for (const auto& tablet_id : need_finish_tablets) {
if (finished_tablets_replica[tablet_id] <
_load_required_replicas_num(tablet_id)) {
return false;
}
@@ -755,10 +760,6 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
for (auto tablet_id : payload->second) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
- {
- std::lock_guard<std::mutex> l(_write_tablets_lock);
- _write_tablets.insert(payload->second.begin(), payload->second.end());
- }
_write_bytes.fetch_add(_cur_mutable_block->bytes());
if (_cur_mutable_block->rows() >= _batch_size ||
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 4948e25448f..554a2d711a4 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -329,10 +329,6 @@ public:
bool is_incremental() const { return _is_incremental; }
int64_t write_bytes() const { return _write_bytes.load(); }
- std::unordered_set<int64_t> write_tablets() {
- std::lock_guard<std::mutex> l(_write_tablets_lock);
- return _write_tablets;
- }
protected:
// make a real open request for relative BE's load channel.
@@ -437,9 +433,7 @@ protected:
bool _is_incremental;
- std::mutex _write_tablets_lock;
std::atomic<int64_t> _write_bytes {0};
- std::unordered_set<int64_t> _write_tablets;
};
// an IndexChannel is related to specific table and its rollup and mv
@@ -571,7 +565,7 @@ private:
int _load_required_replicas_num(int64_t tablet_id);
bool _quorum_success(const std::unordered_set<int64_t>&
unfinished_node_channel_ids,
- const std::unordered_set<int64_t>& write_tablets);
+ const std::unordered_set<int64_t>&
need_finish_tablets);
int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>&
unfinished_node_channel_ids);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 71061965ba9..f524891bcae 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -107,6 +107,7 @@ Status VTabletWriterV2::_incremental_open_streams(
continue;
}
_indexes_from_node[node].emplace_back(tablet);
+ _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
VLOG_DEBUG << "incremental open stream (" << partition->id
<< ", " << tablet_id
<< ")";
@@ -345,6 +346,7 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
+ _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
}
_build_tablet_replica_info(tablet_id, partition);
@@ -500,10 +502,6 @@ Status VTabletWriterV2::write(RuntimeState* state, Block&
input_block) {
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
- {
- std::lock_guard<std::mutex> l(_write_tablets_lock);
- _write_tablets.insert(tablet_id);
- }
}
return Status::OK();
@@ -749,13 +747,24 @@ Status VTabletWriterV2::_close_wait(
Status status;
auto streams_for_node = _load_stream_map->get_streams_for_node();
// 1. first wait for quorum success
+ std::unordered_set<int64_t> need_finish_tablets;
+ auto partition_ids = _tablet_finder->partition_ids();
+ for (const auto& part : _vpartition->get_partitions()) {
+ if (partition_ids.contains(part->id)) {
+ for (const auto& index : part->indexes) {
+ for (const auto& tablet_id : index.tablets) {
+ need_finish_tablets.insert(tablet_id);
+ }
+ }
+ }
+ }
while (true) {
RETURN_IF_ERROR(_check_timeout());
RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
- bool quorum_success = _quorum_success(unfinished_streams);
+ bool quorum_success = _quorum_success(unfinished_streams,
need_finish_tablets);
if (quorum_success || unfinished_streams.empty()) {
LOG(INFO) << "quorum_success: " << quorum_success
- << ", is all unfinished: " << unfinished_streams.empty()
+ << ", is all finished: " << unfinished_streams.empty()
<< ", txn_id: " << _txn_id << ", load_id: " <<
print_id(_load_id);
break;
}
@@ -795,21 +804,19 @@ Status VTabletWriterV2::_close_wait(
}
bool VTabletWriterV2::_quorum_success(
- const std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams) {
+ const std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams,
+ const std::unordered_set<int64_t>& need_finish_tablets) {
if (!config::enable_quorum_success_write) {
return false;
}
- {
- std::lock_guard<std::mutex> l(_write_tablets_lock);
- if (_write_tablets.empty()) {
- return false;
- }
- }
- std::unordered_map<int64_t, int64_t> finished_tablets_replica;
auto streams_for_node = _load_stream_map->get_streams_for_node();
- std::unordered_set<int64_t> finished_dst_ids;
+ if (need_finish_tablets.empty()) [[unlikely]] {
+ return false;
+ }
// 1. calculate finished tablets replica num
+ std::unordered_set<int64_t> finished_dst_ids;
+ std::unordered_map<int64_t, int64_t> finished_tablets_replica;
for (const auto& [dst_id, streams] : streams_for_node) {
bool finished = true;
for (const auto& stream : streams->streams()) {
@@ -822,24 +829,19 @@ bool VTabletWriterV2::_quorum_success(
finished_dst_ids.insert(dst_id);
}
}
- for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& [dst_id, _] : streams_for_node) {
if (!finished_dst_ids.contains(dst_id)) {
continue;
}
- for (const auto& stream : streams->streams()) {
- for (auto tablet_id : stream->write_tablets()) {
- finished_tablets_replica[tablet_id]++;
- }
+ for (const auto& tablet_id : _tablets_by_node[dst_id]) {
+ finished_tablets_replica[tablet_id]++;
}
}
// 2. check if quorum success
- {
- std::lock_guard<std::mutex> l(_write_tablets_lock);
- for (const auto& tablet_id : _write_tablets) {
- if (finished_tablets_replica[tablet_id] <
_load_required_replicas_num(tablet_id)) {
- return false;
- }
+ for (const auto& tablet_id : need_finish_tablets) {
+ if (finished_tablets_replica[tablet_id] <
_load_required_replicas_num(tablet_id)) {
+ return false;
}
}
return true;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index eee2c956644..02fee25e743 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -157,7 +157,8 @@ private:
bool need_wait_after_quorum_success);
bool _quorum_success(
- const std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams);
+ const std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams,
+ const std::unordered_set<int64_t>& need_finish_tablets);
int _load_required_replicas_num(int64_t tablet_id);
@@ -245,6 +246,7 @@ private:
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
+ std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_node;
std::shared_ptr<LoadStreamMap> _load_stream_map;
@@ -256,8 +258,6 @@ private:
// tablet_id -> <total replicas num, load required replicas num>
std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
- std::mutex _write_tablets_lock;
- std::unordered_set<int64_t> _write_tablets;
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]