This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a64656748ba [Enhancenment](wal) disable group commit when streamload
size is too large (#27781)
a64656748ba is described below
commit a64656748badf7665101a7effe1e35292a0d7e54
Author: abmdocrt <[email protected]>
AuthorDate: Sun Dec 3 23:05:11 2023 +0800
[Enhancenment](wal) disable group commit when streamload size is too large
(#27781)
---
be/src/http/action/http_stream.cpp | 8 ++-
be/src/http/action/stream_load.cpp | 5 +-
be/src/http/action/stream_load.h | 1 +
be/src/http/utils.cpp | 17 ++++++
be/src/http/utils.h | 2 +
be/test/http/stream_load_test.cpp | 117 +++++++++++++++++++++++++++++++++++++
6 files changed, 145 insertions(+), 5 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index d81f42824c2..e99e6bcde9f 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -167,10 +167,12 @@ int HttpStreamAction::on_header(HttpRequest* req) {
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
- ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
- config::wait_internal_group_commit_finish;
+ if (iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
+ config::wait_internal_group_commit_finish) {
+ ctx->group_commit = load_size_smaller_than_wal_limit(req);
+ }
- ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ?
true : false;
+ ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
LOG(INFO) << "new income streaming load request." << ctx->brief()
<< " sql : " << req->header(HTTP_SQL);
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 6ca7148b3b6..d93332b6dff 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -50,6 +50,7 @@
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/group_commit_mgr.h"
@@ -192,14 +193,14 @@ int StreamLoadAction::on_header(HttpRequest* req) {
if (iequal(req->header(HTTP_GROUP_COMMIT), "true") &&
!ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at
the same time");
}
- ctx->group_commit = true;
+ ctx->group_commit = load_size_smaller_than_wal_limit(req);
} else {
if (ctx->label.empty()) {
ctx->label = generate_uuid_string();
}
}
- ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ?
true : false;
+ ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
LOG(INFO) << "new income streaming load request." << ctx->brief() << ",
db=" << ctx->db
<< ", tbl=" << ctx->table;
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index 4319d722d11..eac3825f534 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -50,6 +50,7 @@ private:
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
+ bool _load_size_smaller_than_wal_limit(HttpRequest* req);
private:
ExecEnv* _exec_env;
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 31550456c55..bfef46036b7 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -26,6 +26,7 @@
#include <ostream>
#include <vector>
+#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
@@ -189,4 +190,20 @@ void do_dir_response(const std::string& dir_path,
HttpRequest* req) {
HttpChannel::send_reply(req, result_str);
}
+bool load_size_smaller_than_wal_limit(HttpRequest* req) {
+ // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload
content length. If it is empty, it means this streamload
+ // is a chunked streamload and we are not sure its size.
+ // 2. if streamload content length is too large, like larger than 80% of
the WAL constrain.
+ //
+ // This two cases, we are not certain that the Write-Ahead Logging (WAL)
constraints allow for writing down
+ // these blocks within the limited space. So we need to set group_commit =
false to avoid dead lock.
+ if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+ size_t body_bytes =
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
+ // TODO(Yukang): change it to WalManager::wal_limit
+ return !(body_bytes > config::wal_max_disk_size * 0.8);
+ } else {
+ return false;
+ }
+}
+
} // namespace doris
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 2d1e13fbe4e..70ab842450b 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -42,4 +42,6 @@ void do_file_response(const std::string& dir_path,
HttpRequest* req,
void do_dir_response(const std::string& dir_path, HttpRequest* req);
std::string get_content_type(const std::string& file_name);
+
+bool load_size_smaller_than_wal_limit(HttpRequest* req);
} // namespace doris
diff --git a/be/test/http/stream_load_test.cpp
b/be/test/http/stream_load_test.cpp
new file mode 100644
index 00000000000..75588f4190b
--- /dev/null
+++ b/be/test/http/stream_load_test.cpp
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load.h"
+
+#include <gtest/gtest.h>
+
+#include "common/config.h"
+#include "event2/buffer.h"
+#include "event2/event.h"
+#include "event2/http.h"
+#include "event2/http_struct.h"
+#include "evhttp.h"
+#include "http/ev_http_server.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_handler.h"
+#include "http/http_handler_with_auth.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/utils.h"
+#include "olap/wal_manager.h"
+
+namespace doris {
+
+class StreamLoadTest : public testing::Test {
+public:
+ StreamLoadTest() = default;
+ virtual ~StreamLoadTest() = default;
+ void SetUp() override {}
+ void TearDown() override {}
+};
+
+void http_request_done_cb(struct evhttp_request* req, void* arg) {
+ event_base_loopbreak((struct event_base*)arg);
+}
+
+TEST_F(StreamLoadTest, TestHeader) {
+ // 1G
+ config::wal_max_disk_size = 1073741824;
+ // 1. empty info
+ {
+ auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+ HttpRequest req(evhttp_req);
+ EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+ evhttp_request_free(evhttp_req);
+ }
+
+ // 2. chunked stream load whih group commit
+ {
+ //struct event_base* base = nullptr;
+ char uri[] = "Http://127.0.0.1/test.txt";
+ auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+ evhttp_req->type = EVHTTP_REQ_GET;
+ evhttp_req->uri = uri;
+ evhttp_req->uri_elems = evhttp_uri_parse(uri);
+ evhttp_add_header(evhttp_req->input_headers,
HTTP_GROUP_COMMIT.c_str(), "true");
+ HttpRequest req(evhttp_req);
+ req.init_from_evhttp();
+ EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+ evhttp_uri_free(evhttp_req->uri_elems);
+ evhttp_req->uri = nullptr;
+ evhttp_req->uri_elems = nullptr;
+ evhttp_request_free(evhttp_req);
+ }
+
+ // 3. small stream load whih group commit
+ {
+ char uri[] = "Http://127.0.0.1/test.txt";
+ auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+ evhttp_req->type = EVHTTP_REQ_GET;
+ evhttp_req->uri = uri;
+ evhttp_req->uri_elems = evhttp_uri_parse(uri);
+ evhttp_add_header(evhttp_req->input_headers,
HTTP_GROUP_COMMIT.c_str(), "true");
+ evhttp_add_header(evhttp_req->input_headers,
HttpHeaders::CONTENT_LENGTH, "1000");
+ HttpRequest req(evhttp_req);
+ req.init_from_evhttp();
+ EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true);
+ evhttp_uri_free(evhttp_req->uri_elems);
+ evhttp_req->uri = nullptr;
+ evhttp_req->uri_elems = nullptr;
+ evhttp_request_free(evhttp_req);
+ }
+
+ // 4. large stream load whih group commit
+ {
+ char uri[] = "Http://127.0.0.1/test.txt";
+ auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+ evhttp_req->type = EVHTTP_REQ_GET;
+ evhttp_req->uri = uri;
+ evhttp_req->uri_elems = evhttp_uri_parse(uri);
+ evhttp_add_header(evhttp_req->input_headers,
HTTP_GROUP_COMMIT.c_str(), "true");
+ evhttp_add_header(evhttp_req->input_headers,
HttpHeaders::CONTENT_LENGTH, "1073741824");
+ HttpRequest req(evhttp_req);
+ req.init_from_evhttp();
+ EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+ evhttp_uri_free(evhttp_req->uri_elems);
+ evhttp_req->uri = nullptr;
+ evhttp_req->uri_elems = nullptr;
+ evhttp_request_free(evhttp_req);
+ }
+}
+} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]