This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c60a49b05e7 [fix](multi-table-load) fix single stream multi table load
cannot finish (#33816)
c60a49b05e7 is described below
commit c60a49b05e7d5f2846d3874d0dc7f6ed2c63b682
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Apr 19 13:02:45 2024 +0800
[fix](multi-table-load) fix single stream multi table load cannot finish
(#33816)
---
be/src/io/fs/multi_table_pipe.cpp | 22 +++++++++++++++++-----
be/src/io/fs/stream_load_pipe.cpp | 9 +++++++++
be/src/io/fs/stream_load_pipe.h | 4 ++++
3 files changed, 30 insertions(+), 5 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index dbc0c3e0228..28df937479b 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -114,17 +114,25 @@ Status MultiTablePipe::dispatch(const std::string& table,
const char* data, size
} else {
pipe = iter->second;
}
- RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
- "append failed in unplanned kafka
pipe");
+ // It is necessary to determine whether the sum of
pipe_current_capacity and size is greater than pipe_max_capacity,
+ // otherwise the following situation may occur:
+ // the pipe is full but still cannot trigger the request and exec plan
condition,
+ // causing one stream multi table load can not finish
++_unplanned_row_cnt;
+ auto pipe_current_capacity = pipe->current_capacity();
+ auto pipe_max_capacity = pipe->max_capacity();
if (_unplanned_row_cnt >= _row_threshold ||
- _unplanned_pipes.size() >= _wait_tables_threshold) {
+ _unplanned_pipes.size() >= _wait_tables_threshold ||
+ pipe_current_capacity + size > pipe_max_capacity) {
LOG(INFO) << fmt::format(
"unplanned row cnt={} reach row_threshold={}
or "
- "wait_plan_table_threshold={}, "
+ "wait_plan_table_threshold={}, or the sum of "
+ "pipe_current_capacity {} "
+ "and size {} is greater than
pipe_max_capacity {}, "
"plan them",
- _unplanned_row_cnt, _row_threshold,
_wait_tables_threshold)
+ _unplanned_row_cnt, _row_threshold,
_wait_tables_threshold,
+ pipe_current_capacity, size,
pipe_max_capacity)
<< ", ctx: " << _ctx->brief();
Status st = request_and_exec_plans();
_unplanned_row_cnt = 0;
@@ -132,7 +140,11 @@ Status MultiTablePipe::dispatch(const std::string& table,
const char* data, size
return st;
}
}
+
+ RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
+ "append failed in unplanned kafka
pipe");
}
+
return Status::OK();
}
diff --git a/be/src/io/fs/stream_load_pipe.cpp
b/be/src/io/fs/stream_load_pipe.cpp
index cd5ee5a8a09..ecce306bdf1 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -255,5 +255,14 @@ TUniqueId StreamLoadPipe::calculate_pipe_id(const
UniqueId& query_id, int32_t fr
return pipe_id;
}
+size_t StreamLoadPipe::current_capacity() {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_use_proto) {
+ return _proto_buffered_bytes;
+ } else {
+ return _buffered_bytes;
+ }
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index e3042a932c4..afbe9ebf98d 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -81,6 +81,10 @@ public:
// used for pipeline load, which use TUniqueId(lo: query_id.lo +
fragment_id, hi: query_id.hi) as pipe_id
static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t
fragment_id);
+ size_t max_capacity() const { return _max_buffered_bytes; }
+
+ size_t current_capacity();
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]