This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new dd2bcddd0ec branch-3.1: [enhance](load) delete temporary files and 
disk detection #49962 (#53600)
dd2bcddd0ec is described below

commit dd2bcddd0ec0e09839a3ae1b9be11f0273347b75
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 22 15:00:29 2025 +0800

    branch-3.1: [enhance](load) delete temporary files and disk detection 
#49962 (#53600)
    
    Cherry-picked from #49962
    
    Co-authored-by: kkop <[email protected]>
    Co-authored-by: Xin Liao <[email protected]>
---
 be/src/http/action/stream_load.cpp               | 12 ++-
 be/src/http/action/stream_load.h                 |  2 +-
 be/src/runtime/load_path_mgr.cpp                 | 68 ++++++++++++++--
 be/src/runtime/load_path_mgr.h                   |  9 ++-
 be/src/runtime/stream_load/stream_load_context.h |  2 +
 be/test/runtime/stream_load_parquet_test.cpp     | 99 ++++++++++++++++++++++++
 6 files changed, 179 insertions(+), 13 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 3235076a5d6..cde4876388d 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -153,6 +153,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
     // update statistics
     streaming_load_requests_total->increment(1);
     streaming_load_duration_ms->increment(ctx->load_cost_millis);
+    if (!ctx->data_saved_path.empty()) {
+        _exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path);
+    }
 }
 
 Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
@@ -428,13 +431,14 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         ctx->pipe = pipe;
         RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
     } else {
-        RETURN_IF_ERROR(_data_saved_path(http_req, &request.path));
+        RETURN_IF_ERROR(_data_saved_path(http_req, &request.path, 
ctx->body_bytes));
         auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
         RETURN_IF_ERROR(file_sink->open());
         request.__isset.path = true;
         request.fileType = TFileType::FILE_LOCAL;
         request.__set_file_size(ctx->body_bytes);
         ctx->body_sink = file_sink;
+        ctx->data_saved_path = request.path;
     }
     if (!http_req->header(HTTP_COLUMNS).empty()) {
         request.__set_columns(http_req->header(HTTP_COLUMNS));
@@ -793,9 +797,11 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
-Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* 
file_path) {
+Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* 
file_path,
+                                          int64_t file_bytes) {
     std::string prefix;
-    
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY),
 "", &prefix));
+    
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY),
 "", &prefix,
+                                                             file_bytes));
     timeval tv;
     gettimeofday(&tv, nullptr);
     struct tm tm;
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index d1de89c9397..bf359317f94 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -47,7 +47,7 @@ public:
 private:
     Status _on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     Status _handle(std::shared_ptr<StreamLoadContext> ctx);
-    Status _data_saved_path(HttpRequest* req, std::string* file_path);
+    Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t 
file_bytes);
     Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
     Status _handle_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp
index f1899aadb28..5b88b45f722 100644
--- a/be/src/runtime/load_path_mgr.cpp
+++ b/be/src/runtime/load_path_mgr.cpp
@@ -86,7 +86,7 @@ Status LoadPathMgr::init() {
 }
 
 Status LoadPathMgr::allocate_dir(const std::string& db, const std::string& 
label,
-                                 std::string* prefix) {
+                                 std::string* prefix, int64_t file_bytes) {
     Status status = _init_once.call([this] {
         for (auto& store_path : _exec_env->store_paths()) {
             _path_vec.push_back(store_path.path + "/" + MINI_PREFIX);
@@ -96,24 +96,52 @@ Status LoadPathMgr::allocate_dir(const std::string& db, 
const std::string& label
     std::string path;
     auto size = _path_vec.size();
     auto retry = size;
+    auto exceed_capacity_path_num = 0;
+    size_t disk_capacity_bytes = 0;
+    size_t available_bytes = 0;
     while (retry--) {
-        {
-            // add SHARD_PREFIX for compatible purpose
-            std::lock_guard<std::mutex> l(_lock);
-            std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ % 
MAX_SHARD_NUM);
-            path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
-            _idx = (_idx + 1) % size;
+        // Call get_space_info to get disk space information.
+        // If the call fails or the disk space is insufficient,
+        // increment the count of paths exceeding capacity and proceed to the 
next loop iteration.
+        std::string base_path = _path_vec[_idx].substr(0, 
_path_vec[_idx].find("/" + MINI_PREFIX));
+        if (!io::global_local_filesystem()
+                     ->get_space_info(base_path, &disk_capacity_bytes, 
&available_bytes)
+                     .ok() ||
+            !check_disk_space(disk_capacity_bytes, available_bytes, 
file_bytes)) {
+            ++exceed_capacity_path_num;
+            continue;
         }
+        // add SHARD_PREFIX for compatible purpose
+        std::lock_guard<std::mutex> l(_lock);
+        std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ % 
MAX_SHARD_NUM);
+        path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
+        _idx = (_idx + 1) % size;
         status = io::global_local_filesystem()->create_directory(path);
         if (LIKELY(status.ok())) {
             *prefix = path;
             return Status::OK();
         }
     }
-
+    if (exceed_capacity_path_num == size) {
+        return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>("exceed 
capacity limit.");
+    }
     return status;
 }
 
+bool LoadPathMgr::check_disk_space(size_t disk_capacity_bytes, size_t 
available_bytes,
+                                   int64_t file_bytes) {
+    bool is_available = false;
+    int64_t remaining_bytes = available_bytes - file_bytes;
+    double used_ratio = 1.0 - static_cast<double>(remaining_bytes) / 
disk_capacity_bytes;
+    is_available = !(used_ratio >= config::storage_flood_stage_usage_percent / 
100.0 &&
+                     remaining_bytes <= 
config::storage_flood_stage_left_capacity_bytes);
+    if (!is_available) {
+        LOG(WARNING) << "Exceed capacity limit. disk_capacity: " << 
disk_capacity_bytes
+                     << ", available: " << available_bytes << ", file_bytes: " 
<< file_bytes;
+    }
+    return is_available;
+}
+
 bool LoadPathMgr::is_too_old(time_t cur_time, const std::string& label_dir, 
int64_t reserve_hours) {
     struct stat dir_stat;
     if (stat(label_dir.c_str(), &dir_stat)) {
@@ -178,6 +206,30 @@ void LoadPathMgr::process_path(time_t now, const 
std::string& path, int64_t rese
     }
 }
 
+void LoadPathMgr::clean_files_in_path_vec(const std::string& path) {
+    // Check if the path contains "/"+MINI_PREFIX. If not, return directly.
+    if (path.find("/" + MINI_PREFIX) == std::string::npos) {
+        return;
+    }
+
+    bool exists = false;
+    // Check if the path exists
+    Status status = io::global_local_filesystem()->exists(path, &exists);
+    if (!status.ok()) {
+        LOG(WARNING) << "Failed to check if path exists: " << path << ", 
error: " << status;
+        return;
+    }
+    if (exists) {
+        // If the path exists, delete the file or directory corresponding to 
that path
+        status = io::global_local_filesystem()->delete_directory_or_file(path);
+        if (status.ok()) {
+            LOG(INFO) << "Delete path success: " << path;
+        } else {
+            LOG(WARNING) << "Delete path failed: " << path << ", error: " << 
status;
+        }
+    }
+}
+
 void LoadPathMgr::clean_one_path(const std::string& path) {
     bool exists = true;
     std::vector<io::FileInfo> dbs;
diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h
index e0797170959..3a111d55a54 100644
--- a/be/src/runtime/load_path_mgr.h
+++ b/be/src/runtime/load_path_mgr.h
@@ -45,7 +45,10 @@ public:
     Status init();
     void stop();
 
-    Status allocate_dir(const std::string& db, const std::string& label, 
std::string* prefix);
+    Status allocate_dir(const std::string& db, const std::string& label, 
std::string* prefix,
+                        int64_t file_bytes);
+
+    bool check_disk_space(size_t disk_capacity_bytes, size_t available_bytes, 
int64_t file_bytes);
 
     void get_load_data_path(std::vector<std::string>* data_paths);
 
@@ -54,6 +57,8 @@ public:
     std::string get_load_error_absolute_path(const std::string& file_path);
     const std::string& get_load_error_file_dir() const { return 
_error_log_dir; }
 
+    void clean_tmp_files(const std::string& file_path) { 
clean_files_in_path_vec(file_path); }
+
 private:
     bool is_too_old(time_t cur_time, const std::string& label_dir, int64_t 
reserve_hours);
     void clean_one_path(const std::string& path);
@@ -61,6 +66,8 @@ private:
     void clean();
     void process_path(time_t now, const std::string& path, int64_t 
reserve_hours);
 
+    void clean_files_in_path_vec(const std::string& path);
+
     ExecEnv* _exec_env = nullptr;
     std::mutex _lock;
     std::vector<std::string> _path_vec;
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 93f76fad4e6..ff9f0b8be60 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -105,6 +105,8 @@ public:
         }
     }
 
+    std::string data_saved_path;
+
     std::string to_json() const;
 
     std::string prepare_stream_load_record(const std::string& 
stream_load_record);
diff --git a/be/test/runtime/stream_load_parquet_test.cpp 
b/be/test/runtime/stream_load_parquet_test.cpp
new file mode 100644
index 00000000000..76fab778286
--- /dev/null
+++ b/be/test/runtime/stream_load_parquet_test.cpp
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/load_path_mgr.h"
+namespace doris {
+
+class LoadPathMgrTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _exec_env = ExecEnv::GetInstance();
+
+        _load_path_mgr = std::make_unique<LoadPathMgr>(_exec_env);
+        // create tmp file
+        _test_dir = "/tmp/test_clean_file";
+        _test_dir1 = "/tmp/test_clean_file/mini_download";
+        _test_dir2 = "/tmp/test_clean_file1/mini_download/test.parquet";
+
+        auto result = 
io::global_local_filesystem()->delete_directory_or_file(_test_dir1);
+        result = io::global_local_filesystem()->create_directory(_test_dir1);
+        EXPECT_TRUE(result.ok());
+
+        result = 
io::global_local_filesystem()->delete_directory_or_file(_test_dir2);
+        result = io::global_local_filesystem()->create_directory(_test_dir2);
+        EXPECT_TRUE(result.ok());
+
+        
const_cast<std::vector<StorePath>&>(_exec_env->store_paths()).emplace_back(_test_dir,
 1024);
+    }
+
+    void TearDown() override {
+        const_cast<std::vector<StorePath>&>(_exec_env->store_paths()).clear();
+        _load_path_mgr->stop();
+        _exec_env->destroy();
+    }
+
+    ExecEnv* _exec_env;
+    std::unique_ptr<LoadPathMgr> _load_path_mgr;
+    std::string _test_dir;
+    std::string _test_dir1;
+    std::string _test_dir2;
+};
+
+TEST_F(LoadPathMgrTest, CheckDiskSpaceTest) {
+    // Check disk space
+    bool is_available = false;
+    size_t disk_capacity_bytes = 10;
+    size_t available_bytes = 9;
+    int64_t file_bytes = 1;
+    is_available =
+            _load_path_mgr->check_disk_space(disk_capacity_bytes, 
available_bytes, file_bytes);
+    ASSERT_TRUE(is_available);
+
+    // Check disk space
+    is_available = false;
+    disk_capacity_bytes = 10;
+    available_bytes = 2;
+    file_bytes = 1;
+    is_available =
+            _load_path_mgr->check_disk_space(disk_capacity_bytes, 
available_bytes, file_bytes);
+    ASSERT_FALSE(is_available);
+
+    std::string prefix;
+    Status status = _load_path_mgr->allocate_dir("tmp", "test_label1", 
&prefix, 1);
+    EXPECT_TRUE(status.ok());
+    std::cout << "NormalAllocation: " << prefix.size() << std::endl;
+    EXPECT_FALSE(prefix.empty());
+
+    prefix.clear();
+    status = _load_path_mgr->allocate_dir("tmp", "test_label2", &prefix, 
999999999999999999);
+    EXPECT_TRUE(!status.ok());
+    std::cout << "UnNormalAllocation: " << prefix.size() << std::endl;
+    EXPECT_TRUE(prefix.empty());
+
+    std::cout << "clean_tmp_files" << std::endl;
+    bool exists = false;
+    status = io::global_local_filesystem()->exists(_test_dir2, &exists);
+    EXPECT_TRUE(exists);
+    _load_path_mgr->clean_tmp_files(_test_dir2);
+    status = io::global_local_filesystem()->exists(_test_dir2, &exists);
+    EXPECT_FALSE(exists);
+}
+
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to