This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 90ed663eb00716906aeef90893ab8f170d6a7d8c Author: Kaijie Chen <[email protected]> AuthorDate: Thu Jan 25 19:37:02 2024 +0800 [fix](move-memtable) all sinks wait stream close for load timeout (#30356) --- be/src/common/config.cpp | 2 -- be/src/common/config.h | 2 -- be/src/vec/sink/load_stream_stub.cpp | 7 ------- be/src/vec/sink/load_stream_stub.h | 1 - be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 ++++- be/src/vec/sink/writer/vtablet_writer_v2.h | 1 + 6 files changed, 5 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 894ea80d6d1..355a90ab7f9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -766,8 +766,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); DEFINE_Bool(share_delta_writers, "true"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s -// timeout for load stream close wait in ms -DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 79704aa0e9c..8db84b459b8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -819,8 +819,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); DECLARE_Bool(share_delta_writers); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); -// timeout for load stream close wait in ms -DECLARE_Int64(close_load_stream_timeout_ms); // brpc streaming max_buf_size in bytes DECLARE_Int64(load_stream_max_buf_size); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index c4b10162299..5751e8308bd 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -316,13 +316,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { if (_is_closed.load()) { return _check_cancel(); } - // if there are other sinks remaining, let the last sink handle close wait - if (_use_cnt > 0) { - return Status::OK(); - } - if (timeout_ms <= 0) { - timeout_ms = config::close_load_stream_timeout_ms; - } DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock<bthread::Mutex> lock(_close_mutex); if (!_is_closed.load()) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index c91f1016d35..3bc6331dc02 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -150,7 +150,6 @@ public: // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD - // if timeout_ms <= 0, will fallback to config::close_load_stream_timeout_ms Status close_wait(int64_t timeout_ms = 0); // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c42f0955a2a..bb4b60a3a86 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -247,6 +247,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_init(state, profile)); + _timeout_watch.start(); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_TIMER(_open_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -550,9 +551,11 @@ Status VTabletWriterV2::close(Status exec_status) { { SCOPED_TIMER(_close_load_timer); + auto remain_ms = _state->execution_timeout() * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; for (const auto& [_, streams] : _streams_for_node) { for (const auto& stream : streams->streams()) { - RETURN_IF_ERROR(stream->close_wait()); + RETURN_IF_ERROR(stream->close_wait(remain_ms)); } } } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 3b96b578d35..460b3acc33f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -186,6 +186,7 @@ private: int64_t _number_output_rows = 0; MonotonicStopWatch _row_distribution_watch; + MonotonicStopWatch _timeout_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; RuntimeProfile::Counter* _output_rows_counter = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
