This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 48804a978a170e57c5605c886dadddfdc499962c Author: abmdocrt <[email protected]> AuthorDate: Mon Feb 26 09:43:21 2024 +0800 [Fix](group commit) Fix group commit flink error message (#31350) * When using stream processing frameworks like Flink with group commit mode enabled, the uncertain size of imported data makes such behavior prohibitive. Previously, to simplify the process, the error message for excessive data volume during streamload was combined with the one for group commit mode, leading to confusion for users when encountering errors indicating the data volume is too large during Flink imports. To address this issue, we are adjusting the logic: if a user employs [...] --- be/src/http/action/http_stream.cpp | 8 +++++--- be/src/http/action/stream_load.cpp | 8 +++++--- be/src/http/utils.cpp | 11 +++-------- be/src/http/utils.h | 2 +- be/test/http/stream_load_test.cpp | 12 ++++++++---- 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index a651c78930e..6304c44462f 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -378,7 +378,10 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; } - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { // off_mode and empty ctx->group_commit = false; return Status::OK(); @@ -394,8 +397,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { - group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; - if (iequal(group_commit_mode, "sync_mode")) { + if (!load_size_smaller_than_wal_limit(content_length)) { std::stringstream ss; ss << "There is no space for group commit http load async WAL. WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 09e6376df4c..9e9db1bd37e 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -710,7 +710,10 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; } - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { // off_mode and empty ctx->group_commit = false; return Status::OK(); @@ -726,8 +729,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { - group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; - if (iequal(group_commit_mode, "sync_mode")) { + if (!load_size_smaller_than_wal_limit(content_length)) { std::stringstream ss; ss << "There is no space for group commit stream load async WAL. WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 72b80f875e2..1e477530ebf 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -193,20 +193,15 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) { HttpChannel::send_reply(req, result_str); } -bool load_size_smaller_than_wal_limit(HttpRequest* req) { +bool load_size_smaller_than_wal_limit(size_t content_length) { // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty or equels to 0, it means this streamload // is a chunked streamload and we are not sure its size. // 2. if streamload content length is too large, like larger than 80% of the WAL constrain. // // This two cases, we are not certain that the Write-Ahead Logging (WAL) constraints allow for writing down // these blocks within the limited space. So we need to set group_commit = false to avoid dead lock. - if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); - size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); - return (body_bytes != 0 && body_bytes < 0.8 * max_available_size); - } else { - return false; - } + size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + return (content_length < 0.8 * max_available_size); } } // namespace doris diff --git a/be/src/http/utils.h b/be/src/http/utils.h index 70ab842450b..e20e68c5b88 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -43,5 +43,5 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req); std::string get_content_type(const std::string& file_name); -bool load_size_smaller_than_wal_limit(HttpRequest* req); +bool load_size_smaller_than_wal_limit(size_t content_length); } // namespace doris diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 881154e857b..90b6cbe6380 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -63,7 +63,8 @@ TEST_F(StreamLoadTest, TestHeader) { { auto* evhttp_req = evhttp_request_new(nullptr, nullptr); HttpRequest req(evhttp_req); - EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false); + EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true); + EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false); evhttp_request_free(evhttp_req); } @@ -78,7 +79,8 @@ TEST_F(StreamLoadTest, TestHeader) { evhttp_add_header(evhttp_req->input_headers, HTTP_GROUP_COMMIT.c_str(), "true"); HttpRequest req(evhttp_req); req.init_from_evhttp(); - EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false); + EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true); + EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false); evhttp_uri_free(evhttp_req->uri_elems); evhttp_req->uri = nullptr; evhttp_req->uri_elems = nullptr; @@ -96,7 +98,8 @@ TEST_F(StreamLoadTest, TestHeader) { evhttp_add_header(evhttp_req->input_headers, HttpHeaders::CONTENT_LENGTH, "20000"); HttpRequest req(evhttp_req); req.init_from_evhttp(); - EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true); + EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH), "20000"); + EXPECT_EQ(load_size_smaller_than_wal_limit(20000), true); evhttp_uri_free(evhttp_req->uri_elems); evhttp_req->uri = nullptr; evhttp_req->uri_elems = nullptr; @@ -114,7 +117,8 @@ TEST_F(StreamLoadTest, TestHeader) { evhttp_add_header(evhttp_req->input_headers, HttpHeaders::CONTENT_LENGTH, "1073741824"); HttpRequest req(evhttp_req); req.init_from_evhttp(); - EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false); + EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH), "1073741824"); + EXPECT_EQ(load_size_smaller_than_wal_limit(1073741824), false); evhttp_uri_free(evhttp_req->uri_elems); evhttp_req->uri = nullptr; evhttp_req->uri_elems = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
