github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3371964168
##########
be/src/exec/sink/vtablet_finder.cpp:
##########
@@ -22,16 +22,89 @@
#include <gen_cpp/FrontendService_types.h>
#include <glog/logging.h>
+#include <algorithm>
#include <string>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
#include "common/status.h"
#include "core/block/block.h"
#include "runtime/runtime_state.h"
#include "storage/tablet_info.h"
namespace doris {
+
+void AdaptiveRandomBucketState::init_partition(int64_t partition_id,
+ const std::vector<int64_t>&
tablets,
+ const std::vector<int32_t>&
bucket_seqs,
+ int32_t start_tablet_idx) {
+ if (partition_id < 0 || tablets.empty()) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(_mutex);
+ if (_partition_states.contains(partition_id)) {
+ return;
Review Comment:
`TabletsChannel` is keyed by load/index and shared by all senders, but each
sender's open request can carry a different FE-selected ordered bucket list for
the same partition on the same receiver BE. The first sender to initialize the
partition wins here, and all later opens for that partition return without
merging or validating their `random_bucket_partitions`. For example, if two
sink backends are assigned different local start buckets that both route to the
same bucket-owner BE, the second sender's rows will be routed using the first
sender's tablet order/start, defeating the FE assignment and potentially
concentrating all rows on the wrong bucket until rotation. The receiver state
needs to either include sender-specific state or merge/validate subsequent
ordered lists instead of silently ignoring them.
##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -639,6 +681,133 @@ Status BaseTabletsChannel::_write_block_data(
return Status::OK();
}
+std::shared_ptr<std::mutex>
BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
+ std::lock_guard<std::mutex> l(_partition_route_locks_lock);
+ auto& lock = _partition_route_locks[partition_id];
+ if (lock == nullptr) {
+ lock = std::make_shared<std::mutex>();
+ }
+ return lock;
+}
+
+Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
+ const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
+ std::unordered_map<int64_t, DorisVector<uint32_t>>&
partition_to_rowidxs,
+ PTabletWriterAddBlockResult* response) {
+ Block send_data;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size,
&uncompressed_time));
+ CHECK(send_data.rows() == request.partition_ids_size())
+ << "block rows: " << send_data.rows()
+ << ", partition_ids_size: " << request.partition_ids_size();
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ for (const auto& [partition_id, _] : partition_to_rowidxs) {
+ _partition_ids.emplace(partition_id);
+ }
+ }
+
+ g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+ Defer defer {
+ [&]() { g_tablets_channel_send_data_allocated_size <<
-send_data.allocated_bytes(); }};
+
+ auto* tablet_errors = response->mutable_tablet_errors();
+ auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
+
+ auto write_partition_data = [&](int64_t partition_id,
+ const DorisVector<uint32_t>& row_idxs) ->
Status {
+ auto partition_lock = _get_partition_route_lock(partition_id);
+ std::lock_guard<std::mutex> partition_guard(*partition_lock);
+
+ CHECK(_adaptive_random_bucket_state != nullptr);
+ int64_t tablet_id =
_adaptive_random_bucket_state->current_tablet(partition_id);
+ CHECK(tablet_id >= 0) << "invalid current tablet, load_id=" << _load_id
+ << ", partition_id=" << partition_id;
+ LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: route+write begin"
+ << ", load_id=" << _load_id << ", index_id=" << _index_id
+ << ", sender_id=" << request.sender_id()
+ << ", packet_seq=" << request.packet_seq() << ",
partition_id=" << partition_id
+ << ", tablet_id=" << tablet_id << ", row_count=" <<
row_idxs.size();
+
+ {
+ std::shared_lock<std::shared_mutex>
broken_rlock(_broken_tablets_lock);
+ if (_is_broken_tablet(tablet_id)) {
+ LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: skip broken tablet"
+ << ", load_id=" << _load_id << ", index_id=" <<
_index_id
+ << ", sender_id=" << request.sender_id()
+ << ", packet_seq=" << request.packet_seq()
+ << ", partition_id=" << partition_id << ",
tablet_id=" << tablet_id;
+ return Status::OK();
+ }
+ }
+
+ decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
+ {
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
+ tablet_writer_it = _tablet_writers.find(tablet_id);
+ if (tablet_writer_it == _tablet_writers.end()) {
+ return Status::InternalError("unknown tablet to append data,
tablet={}", tablet_id);
+ }
+ }
+
+ bool memtable_flushed = false;
+ Status st = tablet_writer_it->second->write(&send_data, row_idxs,
&memtable_flushed);
+ if (!st.ok()) {
Review Comment:
This new receiver-side path keeps an `unordered_map` iterator after
releasing `_tablet_writers_lock`, then dereferences it for `write()`,
`cancel_with_status()`, and `set_tablet_load_rowset_num_info()`. `add_batch()`
is explicitly allowed to run concurrently with `incremental_open()`, and
`incremental_open()` inserts into `_tablet_writers` under the same lock; that
insert can rehash the map and invalidate this iterator while the partition
route lock is held. A concrete auto-partition load can be writing partition P
here while another sender incremental-opens a different partition on the same
tablets channel, causing undefined behavior or a write through an invalid
iterator. Copy a stable `BaseDeltaWriter*` while holding `_tablet_writers_lock`
and use that pointer after releasing the lock, matching the fact that writers
are inserted but not erased during the channel lifetime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]