This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a64656748ba [Enhancenment](wal) disable group commit when streamload 
size is too large (#27781)
a64656748ba is described below

commit a64656748badf7665101a7effe1e35292a0d7e54
Author: abmdocrt <[email protected]>
AuthorDate: Sun Dec 3 23:05:11 2023 +0800

    [Enhancenment](wal) disable group commit when streamload size is too large 
(#27781)
---
 be/src/http/action/http_stream.cpp |   8 ++-
 be/src/http/action/stream_load.cpp |   5 +-
 be/src/http/action/stream_load.h   |   1 +
 be/src/http/utils.cpp              |  17 ++++++
 be/src/http/utils.h                |   2 +
 be/test/http/stream_load_test.cpp  | 117 +++++++++++++++++++++++++++++++++++++
 6 files changed, 145 insertions(+), 5 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index d81f42824c2..e99e6bcde9f 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -167,10 +167,12 @@ int HttpStreamAction::on_header(HttpRequest* req) {
     ctx->load_type = TLoadType::MANUL_LOAD;
     ctx->load_src_type = TLoadSourceType::RAW;
 
-    ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
-                        config::wait_internal_group_commit_finish;
+    if (iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
+        config::wait_internal_group_commit_finish) {
+        ctx->group_commit = load_size_smaller_than_wal_limit(req);
+    }
 
-    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
 
     LOG(INFO) << "new income streaming load request." << ctx->brief()
               << " sql : " << req->header(HTTP_SQL);
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 6ca7148b3b6..d93332b6dff 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -50,6 +50,7 @@
 #include "http/utils.h"
 #include "io/fs/stream_load_pipe.h"
 #include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/group_commit_mgr.h"
@@ -192,14 +193,14 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         if (iequal(req->header(HTTP_GROUP_COMMIT), "true") && 
!ctx->label.empty()) {
             st = Status::InternalError("label and group_commit can't be set at 
the same time");
         }
-        ctx->group_commit = true;
+        ctx->group_commit = load_size_smaller_than_wal_limit(req);
     } else {
         if (ctx->label.empty()) {
             ctx->label = generate_uuid_string();
         }
     }
 
-    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
               << ", tbl=" << ctx->table;
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index 4319d722d11..eac3825f534 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -50,6 +50,7 @@ private:
     Status _data_saved_path(HttpRequest* req, std::string* file_path);
     Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
+    bool _load_size_smaller_than_wal_limit(HttpRequest* req);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 31550456c55..bfef46036b7 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <vector>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "common/utils.h"
@@ -189,4 +190,20 @@ void do_dir_response(const std::string& dir_path, 
HttpRequest* req) {
     HttpChannel::send_reply(req, result_str);
 }
 
+bool load_size_smaller_than_wal_limit(HttpRequest* req) {
+    // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload 
content length. If it is empty, it means this streamload
+    // is a chunked streamload and we are not sure its size.
+    // 2. if streamload content length is too large, like larger than 80% of 
the WAL constrain.
+    //
+    // This two cases, we are not certain that the Write-Ahead Logging (WAL) 
constraints allow for writing down
+    // these blocks within the limited space. So we need to set group_commit = 
false to avoid dead lock.
+    if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+        size_t body_bytes = 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
+        // TODO(Yukang): change it to WalManager::wal_limit
+        return !(body_bytes > config::wal_max_disk_size * 0.8);
+    } else {
+        return false;
+    }
+}
+
 } // namespace doris
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 2d1e13fbe4e..70ab842450b 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -42,4 +42,6 @@ void do_file_response(const std::string& dir_path, 
HttpRequest* req,
 void do_dir_response(const std::string& dir_path, HttpRequest* req);
 
 std::string get_content_type(const std::string& file_name);
+
+bool load_size_smaller_than_wal_limit(HttpRequest* req);
 } // namespace doris
diff --git a/be/test/http/stream_load_test.cpp 
b/be/test/http/stream_load_test.cpp
new file mode 100644
index 00000000000..75588f4190b
--- /dev/null
+++ b/be/test/http/stream_load_test.cpp
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load.h"
+
+#include <gtest/gtest.h>
+
+#include "common/config.h"
+#include "event2/buffer.h"
+#include "event2/event.h"
+#include "event2/http.h"
+#include "event2/http_struct.h"
+#include "evhttp.h"
+#include "http/ev_http_server.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_handler.h"
+#include "http/http_handler_with_auth.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/utils.h"
+#include "olap/wal_manager.h"
+
+namespace doris {
+
+class StreamLoadTest : public testing::Test {
+public:
+    StreamLoadTest() = default;
+    virtual ~StreamLoadTest() = default;
+    void SetUp() override {}
+    void TearDown() override {}
+};
+
+void http_request_done_cb(struct evhttp_request* req, void* arg) {
+    event_base_loopbreak((struct event_base*)arg);
+}
+
+TEST_F(StreamLoadTest, TestHeader) {
+    // 1G
+    config::wal_max_disk_size = 1073741824;
+    // 1. empty info
+    {
+        auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+        HttpRequest req(evhttp_req);
+        EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+        evhttp_request_free(evhttp_req);
+    }
+
+    // 2. chunked stream load whih group commit
+    {
+        //struct event_base* base = nullptr;
+        char uri[] = "Http://127.0.0.1/test.txt";
+        auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+        evhttp_req->type = EVHTTP_REQ_GET;
+        evhttp_req->uri = uri;
+        evhttp_req->uri_elems = evhttp_uri_parse(uri);
+        evhttp_add_header(evhttp_req->input_headers, 
HTTP_GROUP_COMMIT.c_str(), "true");
+        HttpRequest req(evhttp_req);
+        req.init_from_evhttp();
+        EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+        evhttp_uri_free(evhttp_req->uri_elems);
+        evhttp_req->uri = nullptr;
+        evhttp_req->uri_elems = nullptr;
+        evhttp_request_free(evhttp_req);
+    }
+
+    // 3. small stream load whih group commit
+    {
+        char uri[] = "Http://127.0.0.1/test.txt";
+        auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+        evhttp_req->type = EVHTTP_REQ_GET;
+        evhttp_req->uri = uri;
+        evhttp_req->uri_elems = evhttp_uri_parse(uri);
+        evhttp_add_header(evhttp_req->input_headers, 
HTTP_GROUP_COMMIT.c_str(), "true");
+        evhttp_add_header(evhttp_req->input_headers, 
HttpHeaders::CONTENT_LENGTH, "1000");
+        HttpRequest req(evhttp_req);
+        req.init_from_evhttp();
+        EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true);
+        evhttp_uri_free(evhttp_req->uri_elems);
+        evhttp_req->uri = nullptr;
+        evhttp_req->uri_elems = nullptr;
+        evhttp_request_free(evhttp_req);
+    }
+
+    // 4. large stream load whih group commit
+    {
+        char uri[] = "Http://127.0.0.1/test.txt";
+        auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+        evhttp_req->type = EVHTTP_REQ_GET;
+        evhttp_req->uri = uri;
+        evhttp_req->uri_elems = evhttp_uri_parse(uri);
+        evhttp_add_header(evhttp_req->input_headers, 
HTTP_GROUP_COMMIT.c_str(), "true");
+        evhttp_add_header(evhttp_req->input_headers, 
HttpHeaders::CONTENT_LENGTH, "1073741824");
+        HttpRequest req(evhttp_req);
+        req.init_from_evhttp();
+        EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
+        evhttp_uri_free(evhttp_req->uri_elems);
+        evhttp_req->uri = nullptr;
+        evhttp_req->uri_elems = nullptr;
+        evhttp_request_free(evhttp_req);
+    }
+}
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to