This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new db5d95ea8d0 branch-3.1: Revert "[refactor](sink) refactor vtablet
writer v2 sequential close … #52566 (#52634)
db5d95ea8d0 is described below
commit db5d95ea8d0a89b52b9dd30da97bdff658304045
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 18:06:41 2025 +0800
branch-3.1: Revert "[refactor](sink) refactor vtablet writer v2 sequential
close … #52566 (#52634)
Cherry-picked from #52566
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/load_stream_map_pool.h | 9 --
be/src/vec/sink/load_stream_stub.cpp | 72 +++++++-------
be/src/vec/sink/load_stream_stub.h | 10 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 107 +++++----------------
be/src/vec/sink/writer/vtablet_writer_v2.h | 12 +--
.../test_writer_v2_fault_injection.groovy | 17 +---
6 files changed, 69 insertions(+), 158 deletions(-)
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index bf4f3df2cc9..4ecae2f16be 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -98,15 +98,6 @@ public:
// only call this method after release() returns true.
void close_load(bool incremental);
- std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>
get_streams_for_node() {
- decltype(_streams_for_node) snapshot;
- {
- std::lock_guard<std::mutex> lock(_mutex);
- snapshot = _streams_for_node;
- }
- return snapshot;
- }
-
private:
const UniqueId _load_id;
const int64_t _src_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 62295129dd2..d57ec02645c 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -114,7 +114,9 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
LOG(WARNING) << "stub is not exist when on_closed, " << *this;
return;
}
+ std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
stub->_is_closed.store(true);
+ stub->_close_cv.notify_all();
}
inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler& handler) {
@@ -328,30 +330,37 @@ Status LoadStreamStub::wait_for_schema(int64_t
partition_id, int64_t index_id, i
return Status::OK();
}
-Status LoadStreamStub::close_finish_check(RuntimeState* state, bool*
is_closed) {
+Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
- DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
- { return Status::InternalError("close failed"); });
- *is_closed = true;
if (!_is_open.load()) {
// we don't need to close wait on non-open streams
return Status::OK();
}
if (!_is_closing.load()) {
- *is_closed = false;
return _status;
}
- if (state->get_query_ctx()->is_cancelled()) {
- return state->get_query_ctx()->exec_status();
- }
if (_is_closed.load()) {
- RETURN_IF_ERROR(_check_cancel());
- if (!_is_eos.load()) {
- return Status::InternalError("Stream closed without EOS, {}",
to_string());
+ return _check_cancel();
+ }
+ DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
+ std::unique_lock<bthread::Mutex> lock(_close_mutex);
+ auto timeout_sec = timeout_ms / 1000;
+ while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
+ //the query maybe cancel, so need check after wait 1s
+ timeout_sec = timeout_sec - 1;
+ LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" <<
timeout_sec
+ << ", is_closed=" << _is_closed.load()
+ << ", is_cancelled=" <<
state->get_query_ctx()->is_cancelled();
+ int ret = _close_cv.wait_for(lock, 1000000);
+ if (ret != 0 && timeout_sec <= 0) {
+ return Status::InternalError("stream close_wait timeout, error={},
timeout_ms={}, {}",
+ ret, timeout_ms, to_string());
}
- return Status::OK();
}
- *is_closed = false;
+ RETURN_IF_ERROR(_check_cancel());
+ if (!_is_eos.load()) {
+ return Status::InternalError("stream closed without eos, {}",
to_string());
+ }
return Status::OK();
}
@@ -365,7 +374,11 @@ void LoadStreamStub::cancel(Status reason) {
_cancel_st = reason;
_is_cancelled.store(true);
}
- _is_closed.store(true);
+ {
+ std::lock_guard<bthread::Mutex> lock(_close_mutex);
+ _is_closed.store(true);
+ _close_cv.notify_all();
+ }
}
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const
Slice> data) {
@@ -424,34 +437,12 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf,
Status st) {
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT:
case PStreamHeader::APPEND_DATA: {
-
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
- add_failed_tablet(hdr.tablet_id(), st);
- return;
- });
-
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
- add_failed_tablet(hdr.tablet_id(), st);
- return;
- });
add_failed_tablet(hdr.tablet_id(), st);
} break;
case PStreamHeader::CLOSE_LOAD: {
-
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
- brpc::StreamClose(_stream_id);
- return;
- });
brpc::StreamClose(_stream_id);
} break;
case PStreamHeader::GET_SCHEMA: {
-
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
- // Just log and let wait_for_schema timeout
- std::ostringstream oss;
- for (const auto& tablet : hdr.tablets()) {
- oss << " " << tablet.tablet_id();
- }
- LOG(WARNING) << "failed to send GET_SCHEMA request,
tablet_id:" << oss.str() << ", "
- << *this;
- return;
- });
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
@@ -565,4 +556,13 @@ Status LoadStreamStubs::close_load(const
std::vector<PTabletID>& tablets_to_comm
return status;
}
+Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
+ MonotonicStopWatch watch;
+ watch.start();
+ for (auto& stream : _streams) {
+ RETURN_IF_ERROR(stream->close_wait(state, timeout_ms -
watch.elapsed_time() / 1000 / 1000));
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index afce7119b38..9816770c82e 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -152,7 +152,7 @@ public:
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
- Status close_finish_check(RuntimeState* state, bool* is_closed);
+ Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);
@@ -220,8 +220,6 @@ private:
void _handle_failure(butil::IOBuf& buf, Status st);
Status _check_cancel() {
- DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
- { return Status::InternalError("stream cancelled"); });
if (!_is_cancelled.load()) {
return Status::OK();
}
@@ -246,7 +244,9 @@ protected:
Status _cancel_st;
bthread::Mutex _open_mutex;
+ bthread::Mutex _close_mutex;
bthread::Mutex _cancel_mutex;
+ bthread::ConditionVariable _close_cv;
std::mutex _buffer_mutex;
std::mutex _send_mutex;
@@ -307,6 +307,8 @@ public:
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+ Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+
std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
@@ -325,8 +327,6 @@ public:
return m;
}
- std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
-
private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 8fc2cfdaeb0..f432595efa5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -639,7 +639,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// close_wait on all non-incremental streams, even if this is not the
last sink.
// because some per-instance data structures are now shared among all
sinks
// due to sharing delta writers and load stream stubs.
- RETURN_IF_ERROR(_close_wait(_non_incremental_streams()));
+ RETURN_IF_ERROR(_close_wait(false));
// send CLOSE_LOAD on all incremental streams if this is the last sink.
// this must happen after all non-incremental streams are closed,
@@ -649,7 +649,7 @@ Status VTabletWriterV2::close(Status exec_status) {
}
// close_wait on all incremental streams, even if this is not the last
sink.
- RETURN_IF_ERROR(_close_wait(_incremental_streams()));
+ RETURN_IF_ERROR(_close_wait(true));
// calculate and submit commit info
if (is_last_sink) {
@@ -695,87 +695,32 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_incremental_streams() {
- std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
- auto streams_for_node = _load_stream_map->get_streams_for_node();
- for (const auto& [dst_id, streams] : streams_for_node) {
- for (const auto& stream : streams->streams()) {
- if (stream->is_incremental()) {
- incremental_streams.insert(stream);
- }
- }
- }
- return incremental_streams;
-}
-
-std::unordered_set<std::shared_ptr<LoadStreamStub>>
VTabletWriterV2::_non_incremental_streams() {
- std::unordered_set<std::shared_ptr<LoadStreamStub>>
non_incremental_streams;
- auto streams_for_node = _load_stream_map->get_streams_for_node();
- for (const auto& [dst_id, streams] : streams_for_node) {
- for (const auto& stream : streams->streams()) {
- if (!stream->is_incremental()) {
- non_incremental_streams.insert(stream);
- }
- }
- }
- return non_incremental_streams;
-}
-
-Status VTabletWriterV2::_close_wait(
- std::unordered_set<std::shared_ptr<LoadStreamStub>>
unfinished_streams) {
+Status VTabletWriterV2::_close_wait(bool incremental) {
SCOPED_TIMER(_close_load_timer);
- Status status;
- auto streams_for_node = _load_stream_map->get_streams_for_node();
- while (true) {
- RETURN_IF_ERROR(_check_timeout());
- RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
- if (!status.ok() || unfinished_streams.empty()) {
- LOG(INFO) << "is all unfinished: " << unfinished_streams.empty()
- << ", status: " << status << ", txn_id: " << _txn_id
- << ", load_id: " << print_id(_load_id);
- break;
- }
- bthread_usleep(1000 * 10);
- }
- if (!status.ok()) {
- LOG(WARNING) << "close_wait failed: " << status << ", load_id=" <<
print_id(_load_id);
- }
- return status;
-}
-
-Status VTabletWriterV2::_check_timeout() {
- int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) *
1000 -
- _timeout_watch.elapsed_time() / 1000 / 1000;
- DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms =
0; });
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting, load_id=" <<
print_id(_load_id);
- return Status::TimedOut("load timed out before close waiting");
- }
- return Status::OK();
-}
-
-Status VTabletWriterV2::_check_streams_finish(
- std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams, Status& status,
- const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>&
streams_for_node) {
- for (const auto& [dst_id, streams] : streams_for_node) {
- for (const auto& stream : streams->streams()) {
- if (!unfinished_streams.contains(stream)) {
- continue;
- }
- bool is_closed = false;
- auto stream_st = stream->close_finish_check(_state, &is_closed);
- if (!stream_st.ok()) {
- status = stream_st;
- unfinished_streams.erase(stream);
- LOG(WARNING) << "close_wait failed: " << stream_st
- << ", load_id=" << print_id(_load_id);
- }
- if (is_closed) {
- unfinished_streams.erase(stream);
- }
- }
+ auto st = _load_stream_map->for_each_st(
+ [this, incremental](int64_t dst_id, LoadStreamStubs& streams) ->
Status {
+ if (streams.is_incremental() != incremental) {
+ return Status::OK();
+ }
+ int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", {
remain_ms = 0; });
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting,
load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before close
waiting");
+ }
+ auto st = streams.close_wait(_state, remain_ms);
+ if (!st.ok()) {
+ LOG(WARNING) << "close_wait timeout on streams to dst_id="
<< dst_id
+ << ", load_id=" << print_id(_load_id) << ": "
<< st;
+ }
+ return st;
+ });
+ if (!st.ok()) {
+ LOG(WARNING) << "close_wait failed: " << st << ", load_id=" <<
print_id(_load_id);
}
- return status;
+ return st;
}
void VTabletWriterV2::_calc_tablets_to_commit() {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index cc87002a097..46a3974bba8 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -148,17 +148,7 @@ private:
void _calc_tablets_to_commit();
- std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams();
-
- std::unordered_set<std::shared_ptr<LoadStreamStub>>
_non_incremental_streams();
-
- Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>>
unfinished_streams);
-
- Status _check_timeout();
-
- Status _check_streams_finish(
- std::unordered_set<std::shared_ptr<LoadStreamStub>>&
unfinished_streams, Status& status,
- const std::unordered_map<int64_t,
std::shared_ptr<LoadStreamStubs>>& streams_for_node);
+ Status _close_wait(bool incremental);
void _cancel(Status status);
diff --git
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index 1a473d90e52..30854cfb50b 100644
---
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
file "baseall.txt"
}
- def load_with_injection = { injection, error_msg="", success=false->
+ def load_with_injection = { injection, error_msg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
@@ -104,21 +104,6 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
// DeltaWriterV2 stream_size is 0
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find
tablet schema")
- // injection cases for VTabletWriterV2 close logic
- // Test LoadStreamStub close_finish_check close failed
- load_with_injection("LoadStreamStub::close_finish_check.close_failed")
- // Test LoadStreamStub _check_cancel when cancelled
- load_with_injection("LoadStreamStub._check_cancel.cancelled")
- // Test LoadStreamStub _send_with_retry stream write failed
-
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed")
- // Test LoadStreamStub _handle_failure for different opcodes
-
load_with_injection("LoadStreamStub._handle_failure.append_data_failed")
-
load_with_injection("LoadStreamStub._handle_failure.add_segment_failed")
- load_with_injection("LoadStreamStub._handle_failure.close_load_failed")
- load_with_injection("LoadStreamStub._handle_failure.get_schema_failed")
- // Test LoadStreamStub skip send segment
- load_with_injection("LoadStreamStub.skip_send_segment")
-
sql """ set enable_memtable_on_sink_node=false """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]