This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new dd2bcddd0ec branch-3.1: [enhance](load) delete temporary files and
disk detection #49962 (#53600)
dd2bcddd0ec is described below
commit dd2bcddd0ec0e09839a3ae1b9be11f0273347b75
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 22 15:00:29 2025 +0800
branch-3.1: [enhance](load) delete temporary files and disk detection
#49962 (#53600)
Cherry-picked from #49962
Co-authored-by: kkop <[email protected]>
Co-authored-by: Xin Liao <[email protected]>
---
be/src/http/action/stream_load.cpp | 12 ++-
be/src/http/action/stream_load.h | 2 +-
be/src/runtime/load_path_mgr.cpp | 68 ++++++++++++++--
be/src/runtime/load_path_mgr.h | 9 ++-
be/src/runtime/stream_load/stream_load_context.h | 2 +
be/test/runtime/stream_load_parquet_test.cpp | 99 ++++++++++++++++++++++++
6 files changed, 179 insertions(+), 13 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 3235076a5d6..cde4876388d 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -153,6 +153,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
// update statistics
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_millis);
+ if (!ctx->data_saved_path.empty()) {
+ _exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path);
+ }
}
Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
@@ -428,13 +431,14 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
ctx->pipe = pipe;
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
} else {
- RETURN_IF_ERROR(_data_saved_path(http_req, &request.path));
+ RETURN_IF_ERROR(_data_saved_path(http_req, &request.path,
ctx->body_bytes));
auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
RETURN_IF_ERROR(file_sink->open());
request.__isset.path = true;
request.fileType = TFileType::FILE_LOCAL;
request.__set_file_size(ctx->body_bytes);
ctx->body_sink = file_sink;
+ ctx->data_saved_path = request.path;
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
request.__set_columns(http_req->header(HTTP_COLUMNS));
@@ -793,9 +797,11 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
-Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string*
file_path) {
+Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string*
file_path,
+ int64_t file_bytes) {
std::string prefix;
-
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY),
"", &prefix));
+
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY),
"", &prefix,
+ file_bytes));
timeval tv;
gettimeofday(&tv, nullptr);
struct tm tm;
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index d1de89c9397..bf359317f94 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -47,7 +47,7 @@ public:
private:
Status _on_header(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
- Status _data_saved_path(HttpRequest* req, std::string* file_path);
+ Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t
file_bytes);
Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
Status _handle_group_commit(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp
index f1899aadb28..5b88b45f722 100644
--- a/be/src/runtime/load_path_mgr.cpp
+++ b/be/src/runtime/load_path_mgr.cpp
@@ -86,7 +86,7 @@ Status LoadPathMgr::init() {
}
Status LoadPathMgr::allocate_dir(const std::string& db, const std::string&
label,
- std::string* prefix) {
+ std::string* prefix, int64_t file_bytes) {
Status status = _init_once.call([this] {
for (auto& store_path : _exec_env->store_paths()) {
_path_vec.push_back(store_path.path + "/" + MINI_PREFIX);
@@ -96,24 +96,52 @@ Status LoadPathMgr::allocate_dir(const std::string& db,
const std::string& label
std::string path;
auto size = _path_vec.size();
auto retry = size;
+ auto exceed_capacity_path_num = 0;
+ size_t disk_capacity_bytes = 0;
+ size_t available_bytes = 0;
while (retry--) {
- {
- // add SHARD_PREFIX for compatible purpose
- std::lock_guard<std::mutex> l(_lock);
- std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ %
MAX_SHARD_NUM);
- path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
- _idx = (_idx + 1) % size;
+ // Call get_space_info to get disk space information.
+ // If the call fails or the disk space is insufficient,
+ // increment the count of paths exceeding capacity and proceed to the
next loop iteration.
+ std::string base_path = _path_vec[_idx].substr(0,
_path_vec[_idx].find("/" + MINI_PREFIX));
+ if (!io::global_local_filesystem()
+ ->get_space_info(base_path, &disk_capacity_bytes,
&available_bytes)
+ .ok() ||
+ !check_disk_space(disk_capacity_bytes, available_bytes,
file_bytes)) {
+ ++exceed_capacity_path_num;
+ continue;
}
+ // add SHARD_PREFIX for compatible purpose
+ std::lock_guard<std::mutex> l(_lock);
+ std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ %
MAX_SHARD_NUM);
+ path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
+ _idx = (_idx + 1) % size;
status = io::global_local_filesystem()->create_directory(path);
if (LIKELY(status.ok())) {
*prefix = path;
return Status::OK();
}
}
-
+ if (exceed_capacity_path_num == size) {
+ return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>("exceed
capacity limit.");
+ }
return status;
}
+bool LoadPathMgr::check_disk_space(size_t disk_capacity_bytes, size_t
available_bytes,
+ int64_t file_bytes) {
+ bool is_available = false;
+ int64_t remaining_bytes = available_bytes - file_bytes;
+ double used_ratio = 1.0 - static_cast<double>(remaining_bytes) /
disk_capacity_bytes;
+ is_available = !(used_ratio >= config::storage_flood_stage_usage_percent /
100.0 &&
+ remaining_bytes <=
config::storage_flood_stage_left_capacity_bytes);
+ if (!is_available) {
+ LOG(WARNING) << "Exceed capacity limit. disk_capacity: " <<
disk_capacity_bytes
+ << ", available: " << available_bytes << ", file_bytes: "
<< file_bytes;
+ }
+ return is_available;
+}
+
bool LoadPathMgr::is_too_old(time_t cur_time, const std::string& label_dir,
int64_t reserve_hours) {
struct stat dir_stat;
if (stat(label_dir.c_str(), &dir_stat)) {
@@ -178,6 +206,30 @@ void LoadPathMgr::process_path(time_t now, const
std::string& path, int64_t rese
}
}
+void LoadPathMgr::clean_files_in_path_vec(const std::string& path) {
+ // Check if the path contains "/"+MINI_PREFIX. If not, return directly.
+ if (path.find("/" + MINI_PREFIX) == std::string::npos) {
+ return;
+ }
+
+ bool exists = false;
+ // Check if the path exists
+ Status status = io::global_local_filesystem()->exists(path, &exists);
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to check if path exists: " << path << ",
error: " << status;
+ return;
+ }
+ if (exists) {
+ // If the path exists, delete the file or directory corresponding to
that path
+ status = io::global_local_filesystem()->delete_directory_or_file(path);
+ if (status.ok()) {
+ LOG(INFO) << "Delete path success: " << path;
+ } else {
+ LOG(WARNING) << "Delete path failed: " << path << ", error: " <<
status;
+ }
+ }
+}
+
void LoadPathMgr::clean_one_path(const std::string& path) {
bool exists = true;
std::vector<io::FileInfo> dbs;
diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h
index e0797170959..3a111d55a54 100644
--- a/be/src/runtime/load_path_mgr.h
+++ b/be/src/runtime/load_path_mgr.h
@@ -45,7 +45,10 @@ public:
Status init();
void stop();
- Status allocate_dir(const std::string& db, const std::string& label,
std::string* prefix);
+ Status allocate_dir(const std::string& db, const std::string& label,
std::string* prefix,
+ int64_t file_bytes);
+
+ bool check_disk_space(size_t disk_capacity_bytes, size_t available_bytes,
int64_t file_bytes);
void get_load_data_path(std::vector<std::string>* data_paths);
@@ -54,6 +57,8 @@ public:
std::string get_load_error_absolute_path(const std::string& file_path);
const std::string& get_load_error_file_dir() const { return
_error_log_dir; }
+ void clean_tmp_files(const std::string& file_path) {
clean_files_in_path_vec(file_path); }
+
private:
bool is_too_old(time_t cur_time, const std::string& label_dir, int64_t
reserve_hours);
void clean_one_path(const std::string& path);
@@ -61,6 +66,8 @@ private:
void clean();
void process_path(time_t now, const std::string& path, int64_t
reserve_hours);
+ void clean_files_in_path_vec(const std::string& path);
+
ExecEnv* _exec_env = nullptr;
std::mutex _lock;
std::vector<std::string> _path_vec;
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 93f76fad4e6..ff9f0b8be60 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -105,6 +105,8 @@ public:
}
}
+ std::string data_saved_path;
+
std::string to_json() const;
std::string prepare_stream_load_record(const std::string&
stream_load_record);
diff --git a/be/test/runtime/stream_load_parquet_test.cpp
b/be/test/runtime/stream_load_parquet_test.cpp
new file mode 100644
index 00000000000..76fab778286
--- /dev/null
+++ b/be/test/runtime/stream_load_parquet_test.cpp
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/load_path_mgr.h"
+namespace doris {
+
+class LoadPathMgrTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _exec_env = ExecEnv::GetInstance();
+
+ _load_path_mgr = std::make_unique<LoadPathMgr>(_exec_env);
+ // create tmp file
+ _test_dir = "/tmp/test_clean_file";
+ _test_dir1 = "/tmp/test_clean_file/mini_download";
+ _test_dir2 = "/tmp/test_clean_file1/mini_download/test.parquet";
+
+ auto result =
io::global_local_filesystem()->delete_directory_or_file(_test_dir1);
+ result = io::global_local_filesystem()->create_directory(_test_dir1);
+ EXPECT_TRUE(result.ok());
+
+ result =
io::global_local_filesystem()->delete_directory_or_file(_test_dir2);
+ result = io::global_local_filesystem()->create_directory(_test_dir2);
+ EXPECT_TRUE(result.ok());
+
+
const_cast<std::vector<StorePath>&>(_exec_env->store_paths()).emplace_back(_test_dir,
1024);
+ }
+
+ void TearDown() override {
+ const_cast<std::vector<StorePath>&>(_exec_env->store_paths()).clear();
+ _load_path_mgr->stop();
+ _exec_env->destroy();
+ }
+
+ ExecEnv* _exec_env;
+ std::unique_ptr<LoadPathMgr> _load_path_mgr;
+ std::string _test_dir;
+ std::string _test_dir1;
+ std::string _test_dir2;
+};
+
+TEST_F(LoadPathMgrTest, CheckDiskSpaceTest) {
+ // Check disk space
+ bool is_available = false;
+ size_t disk_capacity_bytes = 10;
+ size_t available_bytes = 9;
+ int64_t file_bytes = 1;
+ is_available =
+ _load_path_mgr->check_disk_space(disk_capacity_bytes,
available_bytes, file_bytes);
+ ASSERT_TRUE(is_available);
+
+ // Check disk space
+ is_available = false;
+ disk_capacity_bytes = 10;
+ available_bytes = 2;
+ file_bytes = 1;
+ is_available =
+ _load_path_mgr->check_disk_space(disk_capacity_bytes,
available_bytes, file_bytes);
+ ASSERT_FALSE(is_available);
+
+ std::string prefix;
+ Status status = _load_path_mgr->allocate_dir("tmp", "test_label1",
&prefix, 1);
+ EXPECT_TRUE(status.ok());
+ std::cout << "NormalAllocation: " << prefix.size() << std::endl;
+ EXPECT_FALSE(prefix.empty());
+
+ prefix.clear();
+ status = _load_path_mgr->allocate_dir("tmp", "test_label2", &prefix,
999999999999999999);
+ EXPECT_TRUE(!status.ok());
+ std::cout << "UnNormalAllocation: " << prefix.size() << std::endl;
+ EXPECT_TRUE(prefix.empty());
+
+ std::cout << "clean_tmp_files" << std::endl;
+ bool exists = false;
+ status = io::global_local_filesystem()->exists(_test_dir2, &exists);
+ EXPECT_TRUE(exists);
+ _load_path_mgr->clean_tmp_files(_test_dir2);
+ status = io::global_local_filesystem()->exists(_test_dir2, &exists);
+ EXPECT_FALSE(exists);
+}
+
+} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]