This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef5a3b3458f [Fix](Recycler) Fix recycler OOM by limiting queued delete
tasks (#59331)
ef5a3b3458f is described below
commit ef5a3b3458fb5b2ef0d273a8eaeea0b811626421
Author: Jimmy <[email protected]>
AuthorDate: Wed Dec 31 02:35:51 2025 +0800
[Fix](Recycler) Fix recycler OOM by limiting queued delete tasks (#59331)
---
cloud/src/common/config.h | 3 +
cloud/src/recycler/obj_storage_client.cpp | 142 ++++++++---
cloud/test/CMakeLists.txt | 3 +-
cloud/test/recycler_batch_delete_test.cpp | 403 ++++++++++++++++++++++++++++++
4 files changed, 511 insertions(+), 40 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 5f2e9c5a32c..e347fa764d1 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -96,6 +96,9 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
+// Max number of delete tasks per batch when recycling objects.
+// Each task deletes up to 1000 files. Controls memory usage during
large-scale deletion.
+CONF_Int32(recycler_max_tasks_per_batch, "1000");
// The worker pool size for http api `statistics_recycle` worker pool
CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5");
CONF_Bool(enable_checker, "false");
diff --git a/cloud/src/recycler/obj_storage_client.cpp
b/cloud/src/recycler/obj_storage_client.cpp
index 3402bb33468..f1fc52f2c28 100644
--- a/cloud/src/recycler/obj_storage_client.cpp
+++ b/cloud/src/recycler/obj_storage_client.cpp
@@ -19,6 +19,7 @@
#include <chrono>
+#include "common/config.h"
#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
@@ -31,58 +32,121 @@ ObjectStorageResponse
ObjStorageClient::delete_objects_recursively_(ObjectStorag
int64_t
expired_time,
size_t
batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_",
&batch_size);
- size_t num_deleted_objects = 0;
+ auto list_iter = list_objects(path);
+ ObjectStorageResponse ret;
+ size_t num_deleted = 0;
+ int error_count = 0;
+ size_t batch_count = 0;
auto start_time = steady_clock::now();
- auto list_iter = list_objects(path);
+ // Read max tasks per batch from config, validate to prevent overflow
+ int32_t config_val = config::recycler_max_tasks_per_batch;
+ size_t max_tasks_per_batch = 1000; // default value
+ if (config_val > 0) {
+ max_tasks_per_batch = static_cast<size_t>(config_val);
+ } else {
+ LOG(WARNING) << "recycler_max_tasks_per_batch=" << config_val
+ << " is not positive, using default 1000";
+ }
- ObjectStorageResponse ret;
- std::vector<std::string> keys;
- SyncExecutor<int> concurrent_delete_executor(
- option.executor,
- fmt::format("delete objects under bucket {}, path {}",
path.bucket, path.key),
- [](const int& ret) { return ret != 0; });
-
- for (auto obj = list_iter->next(); obj.has_value(); obj =
list_iter->next()) {
- if (expired_time > 0 && obj->mtime_s > expired_time) {
- continue;
+ while (true) {
+ // Create a new SyncExecutor for each batch
+ // Note: cancel lambda only takes effect within the current batch
+ SyncExecutor<int> batch_executor(
+ option.executor, fmt::format("delete batch under {}/{}",
path.bucket, path.key),
+ [](const int& r) { return r != 0; });
+
+ std::vector<std::string> keys;
+ size_t tasks_in_batch = 0;
+ bool has_more = true;
+
+ // Collect tasks until reaching batch limit or no more files
+ while (tasks_in_batch < max_tasks_per_batch && has_more) {
+ auto obj = list_iter->next();
+ if (!obj.has_value()) {
+ has_more = false;
+ break;
+ }
+ if (expired_time > 0 && obj->mtime_s > expired_time) {
+ continue;
+ }
+
+ num_deleted++;
+ keys.emplace_back(std::move(obj->key));
+
+ // Submit a delete task when we have batch_size keys
+ if (keys.size() >= batch_size) {
+ batch_executor.add([this, &path, k = std::move(keys),
option]() mutable {
+ return delete_objects(path.bucket, std::move(k),
option).ret;
+ });
+ keys.clear();
+ tasks_in_batch++;
+ }
}
- num_deleted_objects++;
- keys.emplace_back(std::move(obj->key));
- if (keys.size() < batch_size) {
- continue;
+ // Handle remaining keys (less than batch_size)
+ if (!keys.empty()) {
+ batch_executor.add([this, &path, k = std::move(keys), option]()
mutable {
+ return delete_objects(path.bucket, std::move(k), option).ret;
+ });
+ tasks_in_batch++;
}
- concurrent_delete_executor.add([this, &path, k = std::move(keys),
option]() mutable {
- return delete_objects(path.bucket, std::move(k), option).ret;
- });
- }
- if (!list_iter->is_valid()) {
- bool finished;
- concurrent_delete_executor.when_all(&finished);
- return {-1};
- }
+ // Before exiting on empty batch, check if listing is valid
+ // Avoid silently treating listing failure as success
+ if (tasks_in_batch == 0) {
+ if (!list_iter->is_valid()) {
+ LOG(WARNING) << "list_iter invalid with no tasks collected";
+ ret = {-1};
+ }
+ break;
+ }
- if (!keys.empty()) {
- concurrent_delete_executor.add([this, &path, k = std::move(keys),
option]() mutable {
- return delete_objects(path.bucket, std::move(k), option).ret;
- });
- }
- bool finished = true;
- std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
- for (int r : rets) {
- if (r != 0) {
- ret = -1;
+ // Wait for current batch to complete
+ bool finished = true;
+ std::vector<int> rets = batch_executor.when_all(&finished);
+ batch_count++;
+
+ for (int r : rets) {
+ if (r != 0) {
+ error_count++;
+ }
+ }
+
+ // Log batch progress for monitoring long-running delete tasks
+ auto batch_elapsed = duration_cast<milliseconds>(steady_clock::now() -
start_time).count();
+ LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
<< " batch "
+ << batch_count << " completed"
+ << ", tasks_in_batch=" << tasks_in_batch << ",
total_deleted=" << num_deleted
+ << ", elapsed=" << batch_elapsed << " ms";
+
+ // Check finished status: false means stop_token triggered, task
timeout, or task invalid
+ if (!finished) {
+ LOG(WARNING) << "batch execution did not finish normally,
stopping";
+ ret = {-1};
+ break;
+ }
+
+ // Check if list_iter is still valid (network errors, etc.)
+ if (!list_iter->is_valid()) {
+ LOG(WARNING) << "list_iter became invalid during iteration";
+ ret = {-1};
+ break;
}
+
+ // batch_executor goes out of scope, resources are automatically
released
+ }
+
+ if (error_count > 0) {
+ LOG(WARNING) << "delete_objects_recursively completed with " <<
error_count << " errors";
+ ret = {-1};
}
auto elapsed = duration_cast<milliseconds>(steady_clock::now() -
start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
- << " finished, ret=" << ret.ret << ", finished=" << finished
- << ", num_deleted_objects=" << num_deleted_objects << ", cost="
<< elapsed << " ms";
-
- ret = finished ? ret : -1;
+ << " finished, ret=" << ret.ret << ", total_batches=" <<
batch_count
+ << ", num_deleted=" << num_deleted << ", error_count=" <<
error_count
+ << ", cost=" << elapsed << " ms";
return ret;
}
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 0762231c3ab..9edd059321f 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -30,7 +30,8 @@ add_executable(recycler_test
recycler_test.cpp
recycler_operation_log_test.cpp
snapshot_data_size_calculator_test.cpp
- recycle_versioned_keys_test.cpp)
+ recycle_versioned_keys_test.cpp
+ recycler_batch_delete_test.cpp)
add_executable(mem_txn_kv_test mem_txn_kv_test.cpp)
diff --git a/cloud/test/recycler_batch_delete_test.cpp
b/cloud/test/recycler_batch_delete_test.cpp
new file mode 100644
index 00000000000..ffb47c8745c
--- /dev/null
+++ b/cloud/test/recycler_batch_delete_test.cpp
@@ -0,0 +1,403 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/simple_thread_pool.h"
+#include "recycler/obj_storage_client.h"
+
+using namespace doris;
+
+namespace doris::cloud {
+
+// Mock ObjectListIterator for testing
+class MockObjectListIterator : public ObjectListIterator {
+public:
+ MockObjectListIterator(std::vector<ObjectMeta> objects, int fail_after =
-1)
+ : objects_(std::move(objects)), fail_after_(fail_after) {}
+
+ bool is_valid() override { return is_valid_; }
+
+ bool has_next() override {
+ if (!is_valid_) return false;
+ return current_index_ < objects_.size();
+ }
+
+ std::optional<ObjectMeta> next() override {
+ if (!is_valid_ || current_index_ >= objects_.size()) {
+ return std::nullopt;
+ }
+
+ // Simulate iterator becoming invalid after certain number of calls
+ if (fail_after_ >= 0 && static_cast<int>(current_index_) >=
fail_after_) {
+ is_valid_ = false;
+ return std::nullopt;
+ }
+
+ return objects_[current_index_++];
+ }
+
+ void set_invalid() { is_valid_ = false; }
+
+private:
+ std::vector<ObjectMeta> objects_;
+ size_t current_index_ = 0;
+ bool is_valid_ = true;
+ int fail_after_ = -1; // -1 means never fail
+};
+
+// Mock ObjStorageClient for testing delete_objects_recursively_
+class MockObjStorageClient : public ObjStorageClient {
+public:
+ MockObjStorageClient(std::vector<ObjectMeta> objects, int
iterator_fail_after = -1)
+ : objects_(std::move(objects)),
iterator_fail_after_(iterator_fail_after) {}
+
+ ObjectStorageResponse put_object(ObjectStoragePathRef path,
std::string_view stream) override {
+ return {0};
+ }
+
+ ObjectStorageResponse head_object(ObjectStoragePathRef path, ObjectMeta*
res) override {
+ return {0};
+ }
+
+ std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef
path) override {
+ return std::make_unique<MockObjectListIterator>(objects_,
iterator_fail_after_);
+ }
+
+ ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys,
+ ObjClientOptions option) override {
+ delete_calls_++;
+ total_keys_deleted_ += keys.size();
+
+ // Simulate delete failure if configured
+ if (fail_delete_after_ >= 0 && delete_calls_ > fail_delete_after_) {
+ return {-1, "simulated delete failure"};
+ }
+
+ return {0};
+ }
+
+ ObjectStorageResponse delete_object(ObjectStoragePathRef path) override {
return {0}; }
+
+ ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
+ ObjClientOptions option,
+ int64_t expiration_time =
0) override {
+ return delete_objects_recursively_(path, option, expiration_time,
1000);
+ }
+
+ ObjectStorageResponse get_life_cycle(const std::string& bucket,
+ int64_t* expiration_days) override {
+ return {0};
+ }
+
+ ObjectStorageResponse check_versioning(const std::string& bucket) override
{ return {0}; }
+
+ ObjectStorageResponse abort_multipart_upload(ObjectStoragePathRef path,
+ const std::string& upload_id)
override {
+ return {0};
+ }
+
+ // Test helper methods
+ int get_delete_calls() const { return delete_calls_; }
+ size_t get_total_keys_deleted() const { return total_keys_deleted_; }
+ void set_fail_delete_after(int n) { fail_delete_after_ = n; }
+
+private:
+ std::vector<ObjectMeta> objects_;
+ int iterator_fail_after_ = -1;
+ std::atomic<int> delete_calls_ {0};
+ std::atomic<size_t> total_keys_deleted_ {0};
+ int fail_delete_after_ = -1; // -1 means never fail
+};
+
+class RecyclerBatchDeleteTest : public testing::Test {
+protected:
+ void SetUp() override {
+ thread_pool_ = std::make_shared<SimpleThreadPool>(4);
+ thread_pool_->start();
+ }
+
+ void TearDown() override {
+ if (thread_pool_) {
+ thread_pool_->stop();
+ }
+ }
+
+ std::vector<ObjectMeta> generate_objects(size_t count) {
+ std::vector<ObjectMeta> objects;
+ objects.reserve(count);
+ for (size_t i = 0; i < count; ++i) {
+ objects.push_back(ObjectMeta {
+ .key = "test_key_" + std::to_string(i),
+ .size = 100,
+ .mtime_s = 0,
+ });
+ }
+ return objects;
+ }
+
+ std::shared_ptr<SimpleThreadPool> thread_pool_;
+};
+
+// Test 1: Basic batch processing with multiple batches
+TEST_F(RecyclerBatchDeleteTest, MultipleBatches) {
+ // Save original config and set small batch size for testing
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 3; // 3 tasks per batch
+
+ // Create 10 objects, with batch_size=2 (keys per task),
max_tasks_per_batch=3
+ // Expected: 10 objects / 2 keys per task = 5 tasks
+ // 5 tasks / 3 tasks per batch = 2 batches (3 tasks + 2 tasks)
+ auto objects = generate_objects(10);
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ // Use batch_size=2 to create more tasks
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 2);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_delete_calls(), 5); // 10 objects / 2 = 5
delete calls
+ EXPECT_EQ(client.get_total_keys_deleted(), 10); // All 10 keys deleted
+
+ // Restore config
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 2: Iterator becomes invalid during iteration
+TEST_F(RecyclerBatchDeleteTest, IteratorInvalidMidway) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 100;
+
+ // Create 20 objects but iterator fails after 10
+ auto objects = generate_objects(20);
+ MockObjStorageClient client(objects, 10); // fail_after=10
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5);
+
+ // Should return error because iterator became invalid
+ EXPECT_EQ(response.ret, -1);
+ // Should have processed some objects before failure
+ EXPECT_GT(client.get_total_keys_deleted(), 0);
+ EXPECT_LT(client.get_total_keys_deleted(), 20);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 3: Delete operation fails (triggers cancel)
+TEST_F(RecyclerBatchDeleteTest, DeleteFailureTriggersCancel) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 10;
+
+ auto objects = generate_objects(30);
+ MockObjStorageClient client(objects);
+ client.set_fail_delete_after(2); // Fail after 2 successful deletes
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5);
+
+ // Should return error because delete failed
+ EXPECT_EQ(response.ret, -1);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 4: Empty object list
+TEST_F(RecyclerBatchDeleteTest, EmptyObjectList) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 100;
+
+ std::vector<ObjectMeta> empty_objects;
+ MockObjStorageClient client(empty_objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 1000);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_delete_calls(), 0);
+ EXPECT_EQ(client.get_total_keys_deleted(), 0);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 5: Objects less than batch_size
+TEST_F(RecyclerBatchDeleteTest, ObjectsLessThanBatchSize) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 100;
+
+ auto objects = generate_objects(5);
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ // batch_size=1000, but only 5 objects
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 1000);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_delete_calls(), 1); // All 5 keys in one delete call
+ EXPECT_EQ(client.get_total_keys_deleted(), 5);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 6: Exact batch boundary
+TEST_F(RecyclerBatchDeleteTest, ExactBatchBoundary) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 2; // 2 tasks per batch
+
+ // 8 objects with batch_size=2 = 4 tasks
+ // 4 tasks with max_tasks_per_batch=2 = exactly 2 batches
+ auto objects = generate_objects(8);
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 2);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_delete_calls(), 4); // 8 / 2 = 4 tasks
+ EXPECT_EQ(client.get_total_keys_deleted(), 8);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 7: Invalid config value (negative)
+TEST_F(RecyclerBatchDeleteTest, InvalidConfigNegative) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = -1; // Invalid negative value
+
+ auto objects = generate_objects(10);
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ // Should use default value 1000 and still work
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_total_keys_deleted(), 10);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 8: Invalid config value (zero)
+TEST_F(RecyclerBatchDeleteTest, InvalidConfigZero) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 0; // Invalid zero value
+
+ auto objects = generate_objects(10);
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ // Should use default value 1000 and still work
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_total_keys_deleted(), 10);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 9: Expiration time filtering
+TEST_F(RecyclerBatchDeleteTest, ExpirationTimeFiltering) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 100;
+
+ std::vector<ObjectMeta> objects;
+ // Create 10 objects: 5 with old mtime (should be deleted), 5 with new
mtime (should be kept)
+ for (int i = 0; i < 5; ++i) {
+ objects.push_back(ObjectMeta {
+ .key = "old_key_" + std::to_string(i),
+ .size = 100,
+ .mtime_s = 100, // Old timestamp
+ });
+ }
+ for (int i = 0; i < 5; ++i) {
+ objects.push_back(ObjectMeta {
+ .key = "new_key_" + std::to_string(i),
+ .size = 100,
+ .mtime_s = 1000, // New timestamp
+ });
+ }
+
+ MockObjStorageClient client(objects);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ // Set expiration_time=500, so only objects with mtime_s <= 500 should be
deleted
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 500,
1000);
+
+ EXPECT_EQ(response.ret, 0);
+ EXPECT_EQ(client.get_total_keys_deleted(), 5); // Only old objects deleted
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+// Test 10: Iterator invalid at start (empty batch scenario)
+TEST_F(RecyclerBatchDeleteTest, IteratorInvalidAtStart) {
+ int32_t original_config = config::recycler_max_tasks_per_batch;
+ config::recycler_max_tasks_per_batch = 100;
+
+ // Iterator fails immediately (fail_after=0)
+ auto objects = generate_objects(10);
+ MockObjStorageClient client(objects, 0);
+
+ ObjClientOptions options;
+ options.executor = thread_pool_;
+
+ auto response = client.delete_objects_recursively_(
+ {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5);
+
+ // Should return error because iterator was invalid from the start
+ EXPECT_EQ(response.ret, -1);
+ EXPECT_EQ(client.get_delete_calls(), 0);
+
+ config::recycler_max_tasks_per_batch = original_config;
+}
+
+} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]