This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 78da6151fa3 branch-4.0: [Test](client) Add s3 storage client test 
sketelon #57456 (#58164)
78da6151fa3 is described below

commit 78da6151fa3eec0e846908ff7bfd1e74a81ca3c4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 13:50:26 2025 +0800

    branch-4.0: [Test](client) Add s3 storage client test sketelon #57456 
(#58164)
    
    Cherry-picked from #57456
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 be/src/io/fs/azure_obj_storage_client.cpp |    2 +-
 be/src/io/fs/s3_obj_storage_client.cpp    |   10 +-
 be/src/util/s3_util.h                     |    1 +
 be/test/io/client/s3_file_system_test.cpp | 1895 +++++++++++++++++++++++++++++
 run-be-ut.sh                              |    2 +-
 5 files changed, 1905 insertions(+), 5 deletions(-)

diff --git a/be/src/io/fs/azure_obj_storage_client.cpp 
b/be/src/io/fs/azure_obj_storage_client.cpp
index d27df50350f..017a3166eab 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -207,7 +207,7 @@ ObjectStorageUploadResponse 
AzureObjStorageClient::upload_part(const ObjectStora
         // The blockId must be base64 encoded
         s3_put_rate_limit([&]() {
             SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-            client.StageBlock(base64_encode_part_num(part_num), memory_body);
+            return client.StageBlock(base64_encode_part_num(part_num), 
memory_body);
         });
     } catch (Azure::Core::RequestFailedException& e) {
         auto msg = fmt::format(
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index e9f81da5e90..7930e31ff15 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -311,9 +311,6 @@ ObjectStorageHeadResponse 
S3ObjStorageClient::head_object(const ObjectStoragePat
                          
static_cast<int>(outcome.GetError().GetResponseCode()),
                          outcome.GetError().GetRequestId()}};
     }
-    return ObjectStorageHeadResponse {
-            .resp = ObjectStorageResponse::OK(),
-    };
 }
 
 ObjectStorageResponse S3ObjStorageClient::get_object(const 
ObjectStoragePathOptions& opts,
@@ -334,6 +331,8 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const 
ObjectStoragePathOpti
                 outcome.GetError().GetRequestId()};
     }
     *size_return = outcome.GetResult().GetContentLength();
+    // case for incomplete read
+    SYNC_POINT_CALLBACK("s3_obj_storage_client::get_object", size_return);
     if (*size_return != bytes_read) {
         return {convert_to_obj_response(Status::InternalError(
                 "failed to read from {}(bytes read: {}, bytes req: {}), 
request_id: {}",
@@ -405,6 +404,8 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects(const ObjectStoragePath
                 static_cast<int>(delete_outcome.GetError().GetResponseCode()),
                 delete_outcome.GetError().GetRequestId()};
     }
+    // case for partial delete object failure
+    SYNC_POINT_CALLBACK("s3_obj_storage_client::delete_objects", 
&delete_outcome);
     if (!delete_outcome.GetResult().GetErrors().empty()) {
         const auto& e = delete_outcome.GetResult().GetErrors().front();
         return {convert_to_obj_response(
@@ -470,6 +471,9 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
                         
static_cast<int>(delete_outcome.GetError().GetResponseCode()),
                         delete_outcome.GetError().GetRequestId()};
             }
+            // case for partial delete object failure
+            
SYNC_POINT_CALLBACK("s3_obj_storage_client::delete_objects_recursively",
+                                &delete_outcome);
             if (!delete_outcome.GetResult().GetErrors().empty()) {
                 const auto& e = delete_outcome.GetResult().GetErrors().front();
                 return {convert_to_obj_response(Status::InternalError(
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 829cc34e75f..1d09fcfe8b8 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -62,6 +62,7 @@ extern bvar::LatencyRecorder s3_copy_object_latency;
 }; // namespace s3_bvar
 
 std::string hide_access_key(const std::string& ak);
+int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t 
max_burst, size_t limit);
 
 class S3URI;
 struct S3ClientConf {
diff --git a/be/test/io/client/s3_file_system_test.cpp 
b/be/test/io/client/s3_file_system_test.cpp
new file mode 100644
index 00000000000..06e59ab9da9
--- /dev/null
+++ b/be/test/io/client/s3_file_system_test.cpp
@@ -0,0 +1,1895 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/s3_file_system.h"
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/DeleteObjectsResult.h>
+#include <aws/s3/model/Error.h>
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <ios>
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "cpp/sync_point.h"
+#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/obj_storage_client.h"
+#include "runtime/exec_env.h"
+#include "util/s3_util.h"
+
+namespace doris {
+
+#define GET_ENV_IF_DEFINED(var)              \
+    ([]() -> std::string {                   \
+        const char* val = std::getenv(#var); \
+        return val ? std::string(val) : "";  \
+    }())
+
+static const char* MOCK_S3_FS_ID = "mock_s3_fs_id_6";
+
+class S3TestConfig {
+public:
+    S3TestConfig() {
+        // Check if S3 client is enabled
+        enabled = (GET_ENV_IF_DEFINED(ENABLE_S3_CLIENT) == "1");
+
+        if (!enabled) {
+            return;
+        }
+
+        access_key = GET_ENV_IF_DEFINED(S3_AK);
+        secret_key = GET_ENV_IF_DEFINED(S3_SK);
+        endpoint = GET_ENV_IF_DEFINED(S3_ENDPOINT);
+        provider = GET_ENV_IF_DEFINED(S3_PROVIDER);
+        bucket = GET_ENV_IF_DEFINED(S3_BUCKET);
+        region = GET_ENV_IF_DEFINED(S3_REGION);
+        prefix = GET_ENV_IF_DEFINED(S3_PREFIX);
+    }
+
+    bool is_enabled() const { return enabled; }
+
+    bool is_valid() const {
+        return enabled && !access_key.empty() && !secret_key.empty() && 
!endpoint.empty() &&
+               !region.empty() && !bucket.empty();
+    }
+
+    std::string get_access_key() const { return access_key; }
+    std::string get_secret_key() const { return secret_key; }
+    std::string get_endpoint() const { return endpoint; }
+    std::string get_provider() const { return provider; }
+    std::string get_bucket() const { return bucket; }
+    std::string get_prefix() const { return prefix; }
+    std::string get_region() const { return region; }
+
+    void print_config() const {
+        std::cout << "S3 Test Configuration:" << std::endl;
+        std::cout << "  Enabled: " << (enabled ? "Yes" : "No") << std::endl;
+        if (enabled) {
+            std::cout << "  Access Key: " << (access_key.empty() ? "<empty>" : 
"<set>")
+                      << std::endl;
+            std::cout << "  Secret Key: " << (secret_key.empty() ? "<empty>" : 
"<hidden>")
+                      << std::endl;
+            std::cout << "  Endpoint: " << endpoint << std::endl;
+            std::cout << "  Provider: " << provider << std::endl;
+            std::cout << "  Bucket: " << bucket << std::endl;
+            std::cout << " Region: " << region << std::endl;
+            std::cout << "  Prefix: " << prefix << std::endl;
+        }
+    }
+
+private:
+    bool enabled = false;
+    std::string access_key;
+    std::string secret_key;
+    std::string endpoint;
+    std::string provider;
+    std::string bucket;
+    std::string region;
+    std::string prefix;
+};
+
+class S3FileSystemTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        config_ = std::make_shared<S3TestConfig>();
+
+        // Print configuration for debugging (always print for S3 tests)
+        config_->print_config();
+
+        // Skip tests if S3 is not enabled or not configured properly
+        if (!config_->is_enabled()) {
+            GTEST_SKIP() << "S3 client is not enabled. Use --enable_s3_client 
flag to enable.";
+        }
+
+        if (!config_->is_valid()) {
+            GTEST_SKIP() << "S3 configuration is incomplete. Required: AK, SK, 
ENDPOINT, BUCKET.";
+        }
+
+        // Attempt to create S3 client
+        if (auto st = create_client(); !st.ok()) {
+            GTEST_SKIP() << "Failed to create S3 client with provided 
configuration."
+                         << st.to_string();
+        }
+
+        std::unique_ptr<ThreadPool> _pool;
+        std::ignore = ThreadPoolBuilder("s3_upload_file_thread_pool")
+                              .set_min_threads(5)
+                              .set_max_threads(10)
+                              .build(&_pool);
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
+
+        auto test_path = config_->get_prefix() == "" ? "s3_fs_test_dir" : 
config_->get_prefix();
+        global_test_prefix_ = get_unique_test_path(test_path);
+        auto status = s3_fs_->delete_directory(global_test_prefix_);
+        EXPECT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+    }
+
+    io::ObjStorageType convert_provider(const std::string& provider_str) {
+        if (provider_str == "AZURE") {
+            return io::ObjStorageType::AZURE;
+        } else {
+            return io::ObjStorageType::AWS; // Default to AWS S3
+        }
+    }
+
+    Status create_client() {
+        S3Conf s3_conf;
+        s3_conf.bucket = config_->get_bucket();
+        s3_conf.prefix = config_->get_prefix();
+        s3_conf.client_conf.ak = config_->get_access_key();
+        s3_conf.client_conf.sk = config_->get_secret_key();
+        s3_conf.client_conf.endpoint = config_->get_endpoint();
+        s3_conf.client_conf.bucket = config_->get_bucket();
+        s3_conf.client_conf.region = config_->get_region();
+        s3_conf.client_conf.provider = 
convert_provider(config_->get_provider());
+
+        s3_fs_ = DORIS_TRY(io::S3FileSystem::create(s3_conf, MOCK_S3_FS_ID));
+        return Status::OK();
+    }
+
+    void TearDown() override {
+        // Cleanup resources
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
+        if (s3_fs_) {
+            auto status = s3_fs_->delete_directory(global_test_prefix_);
+            EXPECT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+        }
+    }
+
+    std::string get_unique_test_path(const std::string& test_name) {
+        std::string path = config_->get_prefix();
+        if (!path.empty() && path.back() != '/') {
+            path += '/';
+        }
+        path += "ut_" + test_name + "/";
+        return path;
+    }
+
+    std::shared_ptr<S3TestConfig> config_;
+    std::shared_ptr<io::S3FileSystem> s3_fs_;
+    std::string global_test_prefix_;
+};
+
+// Test: Simple put object test
+TEST_F(S3FileSystemTest, SimplePutObjectTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Generate a unique test file path
+    std::string test_file = global_test_prefix_ + "test_file.txt";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Create file writer
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+    ASSERT_NE(writer, nullptr);
+
+    // Write test data
+    std::string test_data = "Hello, S3! This is a simple test.";
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    // Close the writer
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    std::cout << "Successfully wrote " << test_data.size() << " bytes to S3" 
<< std::endl;
+
+    // Verify file exists
+    bool exists = false;
+    status = s3_fs_->exists(test_file, &exists);
+    ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+    EXPECT_TRUE(exists) << "File should exist after writing";
+
+    // Verify file size
+    int64_t file_size = 0;
+    status = s3_fs_->file_size(test_file, &file_size);
+    ASSERT_TRUE(status.ok()) << "Failed to get file size: " << 
status.to_string();
+    EXPECT_EQ(file_size, test_data.size()) << "File size mismatch";
+
+    std::cout << "File size: " << file_size << " bytes" << std::endl;
+    std::cout << "Test completed successfully!" << std::endl;
+
+    // Cleanup test file
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: Large file with multipart upload
+TEST_F(S3FileSystemTest, MultipartUploadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + "large_file.dat";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Create file writer
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+    ASSERT_NE(writer, nullptr);
+
+    // Write large data to trigger multipart upload (>5MB for multipart)
+    size_t chunk_size = 1024 * 1024; // 1MB
+    size_t num_chunks = 6;           // 6MB total
+    std::string chunk_data(chunk_size, 'A');
+
+    for (size_t i = 0; i < num_chunks; ++i) {
+        // Vary the data pattern for each chunk
+        std::ranges::fill(chunk_data, 'A' + (i % 26));
+        status = writer->append(chunk_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write chunk " << i << ": " << 
status.to_string();
+    }
+
+    // Close the writer to complete multipart upload
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    std::cout << "Successfully uploaded " << (chunk_size * num_chunks)
+              << " bytes using multipart upload" << std::endl;
+
+    // Verify file exists and has correct size
+    bool exists = false;
+    status = s3_fs_->exists(test_file, &exists);
+    ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+    EXPECT_TRUE(exists) << "File should exist after multipart upload";
+
+    int64_t file_size = 0;
+    status = s3_fs_->file_size(test_file, &file_size);
+    ASSERT_TRUE(status.ok()) << "Failed to get file size: " << 
status.to_string();
+    EXPECT_EQ(file_size, chunk_size * num_chunks) << "File size mismatch";
+
+    std::cout << "Multipart upload test completed successfully!" << std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: List objects in directory
+TEST_F(S3FileSystemTest, ListObjectsTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + "list_test_dir/";
+    std::cout << "Test directory path: " << test_dir << std::endl;
+
+    // Create multiple test files
+    std::vector<std::string> test_files = {
+            test_dir + "file1.txt",
+            test_dir + "file2.txt",
+            test_dir + "subdir/file3.txt",
+    };
+
+    for (const auto& file_path : test_files) {
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(file_path, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string data = "Test data for " + file_path;
+        status = writer->append(data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // List files in directory
+    bool exists = false;
+    std::vector<io::FileInfo> files;
+    Status status = s3_fs_->list(test_dir, true, &files, &exists);
+    ASSERT_TRUE(status.ok()) << "Failed to list files: " << status.to_string();
+    EXPECT_TRUE(exists);
+
+    std::cout << "Found " << files.size() << " files" << std::endl;
+    for (const auto& file : files) {
+        std::cout << "  - " << file.file_name << " (" << file.file_size << " 
bytes)" << std::endl;
+    }
+
+    // Verify we got all files
+    EXPECT_GE(files.size(), test_files.size()) << "Should list all created 
files";
+
+    std::cout << "List objects test completed successfully!" << std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_directory(test_dir);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test directory: " << 
status.to_string();
+}
+
+// Test: Batch delete files
+TEST_F(S3FileSystemTest, BatchDeleteTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + "batch_delete_test/";
+    std::cout << "Test directory path: " << test_dir << std::endl;
+
+    // Create multiple test files
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 5; ++i) {
+        std::string file_path = test_dir + "file_" + std::to_string(i) + 
".txt";
+        test_files.push_back(file_path);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(file_path, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string data = "Test data " + std::to_string(i);
+        status = writer->append(data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Verify all files exist
+    for (const auto& file_path : test_files) {
+        bool exists = false;
+        Status status = s3_fs_->exists(file_path, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+        EXPECT_TRUE(exists) << "File should exist: " << file_path;
+    }
+
+    // Batch delete files
+    std::vector<io::Path> paths_to_delete;
+    for (const auto& file_path : test_files) {
+        paths_to_delete.emplace_back(file_path);
+    }
+
+    Status status = s3_fs_->batch_delete(paths_to_delete);
+    ASSERT_TRUE(status.ok()) << "Failed to batch delete files: " << 
status.to_string();
+
+    std::cout << "Successfully batch deleted " << paths_to_delete.size() << " 
files" << std::endl;
+
+    // Verify all files are deleted
+    for (const auto& file_path : test_files) {
+        bool exists = false;
+        status = s3_fs_->exists(file_path, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+        EXPECT_FALSE(exists) << "File should not exist after deletion: " << 
file_path;
+    }
+
+    std::cout << "Batch delete test completed successfully!" << std::endl;
+}
+
+// Test: Delete directory recursively
+TEST_F(S3FileSystemTest, DeleteDirectoryRecursivelyTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + "recursive_delete_test/";
+    std::cout << "Test directory path: " << test_dir << std::endl;
+
+    // Create nested directory structure with files
+    std::vector<std::string> test_files = {
+            test_dir + "file1.txt",
+            test_dir + "file2.txt",
+            test_dir + "subdir1/file3.txt",
+            test_dir + "subdir1/file4.txt",
+            test_dir + "subdir2/nested/file5.txt",
+    };
+
+    for (const auto& file_path : test_files) {
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(file_path, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string data = "Test data for " + file_path;
+        status = writer->append(data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Verify files exist
+    for (const auto& file_path : test_files) {
+        bool exists = false;
+        Status status = s3_fs_->exists(file_path, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+        EXPECT_TRUE(exists) << "File should exist: " << file_path;
+    }
+
+    // Delete directory recursively
+    Status status = s3_fs_->delete_directory(test_dir);
+    ASSERT_TRUE(status.ok()) << "Failed to delete directory recursively: " << 
status.to_string();
+
+    std::cout << "Successfully deleted directory recursively" << std::endl;
+
+    // Verify all files are deleted
+    for (const auto& file_path : test_files) {
+        bool exists = false;
+        status = s3_fs_->exists(file_path, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+        EXPECT_FALSE(exists) << "File should not exist after recursive 
deletion: " << file_path;
+    }
+
+    std::cout << "Delete directory recursively test completed successfully!" 
<< std::endl;
+}
+
+// Test: Upload and download local file
+TEST_F(S3FileSystemTest, UploadDownloadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Create a temporary local file
+    std::string local_file = "/tmp/test_upload_file.txt";
+    std::string test_content = "This is test content for upload/download 
test.\nLine 2\nLine 3";
+
+    std::ofstream ofs(local_file);
+    ASSERT_TRUE(ofs.is_open()) << "Failed to create local file";
+    ofs << test_content;
+    ofs.close();
+
+    std::string remote_file = global_test_prefix_ + "uploaded_file.txt";
+    std::cout << "Remote file path: " << remote_file << std::endl;
+
+    // Upload local file to S3
+    Status status = s3_fs_->upload(local_file, remote_file);
+    ASSERT_TRUE(status.ok()) << "Failed to upload file: " << 
status.to_string();
+
+    std::cout << "Successfully uploaded file" << std::endl;
+
+    // Verify file exists on S3
+    bool exists = false;
+    status = s3_fs_->exists(remote_file, &exists);
+    ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+    EXPECT_TRUE(exists) << "Uploaded file should exist";
+
+    // Download file from S3
+    std::string download_file = "/tmp/test_download_file.txt";
+    status = s3_fs_->download(remote_file, download_file);
+    ASSERT_TRUE(status.ok()) << "Failed to download file: " << 
status.to_string();
+
+    std::cout << "Successfully downloaded file" << std::endl;
+
+    // Verify downloaded content matches original
+    std::ifstream ifs(download_file);
+    ASSERT_TRUE(ifs.is_open()) << "Failed to open downloaded file";
+    std::string downloaded_content((std::istreambuf_iterator<char>(ifs)),
+                                   std::istreambuf_iterator<char>());
+    ifs.close();
+
+    EXPECT_EQ(test_content, downloaded_content) << "Downloaded content should 
match original";
+
+    std::cout << "Upload/download test completed successfully!" << std::endl;
+
+    // Cleanup
+    std::remove(local_file.c_str());
+    std::remove(download_file.c_str());
+    status = s3_fs_->delete_file(remote_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete remote file: " << 
status.to_string();
+}
+
+// Test: Open file and read content
+TEST_F(S3FileSystemTest, OpenFileAndReadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + "read_test_file.txt";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Create and write test file
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    std::string test_data =
+            "Hello, S3 Read Test! This is line 1.\nThis is line 2.\nThis is 
line 3.";
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    // Open file for reading
+    io::FileReaderSPtr reader;
+    status = s3_fs_->open_file(test_file, &reader);
+    ASSERT_TRUE(status.ok()) << "Failed to open file: " << status.to_string();
+    ASSERT_NE(reader, nullptr);
+
+    // Read entire file
+    size_t file_size = reader->size();
+    EXPECT_EQ(file_size, test_data.size()) << "File size should match written 
data";
+
+    std::vector<char> buffer(file_size);
+    size_t bytes_read = 0;
+    status = reader->read_at(0, Slice(buffer.data(), file_size), &bytes_read);
+    ASSERT_TRUE(status.ok()) << "Failed to read file: " << status.to_string();
+    EXPECT_EQ(bytes_read, file_size) << "Should read entire file";
+
+    std::string read_data(buffer.data(), bytes_read);
+    EXPECT_EQ(read_data, test_data) << "Read data should match written data";
+
+    std::cout << "Successfully read " << bytes_read << " bytes from S3" << 
std::endl;
+
+    // Test partial read
+    size_t offset = 7;
+    size_t read_size = 15;
+    std::vector<char> partial_buffer(read_size);
+    bytes_read = 0;
+    status = reader->read_at(offset, Slice(partial_buffer.data(), read_size), 
&bytes_read);
+    ASSERT_TRUE(status.ok()) << "Failed to read file partially: " << 
status.to_string();
+    EXPECT_EQ(bytes_read, read_size) << "Should read requested bytes";
+
+    std::string partial_data(partial_buffer.data(), bytes_read);
+    std::string expected_partial = test_data.substr(offset, read_size);
+    EXPECT_EQ(partial_data, expected_partial) << "Partial read data should 
match";
+
+    std::cout << "Open file and read test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: Generate presigned URL
+TEST_F(S3FileSystemTest, GeneratePresignedUrlTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + "presigned_url_test.txt";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Create test file
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    std::string test_data = "Test data for presigned URL";
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    // Generate presigned URL
+    int64_t expiration_secs = 3600; // 1 hour
+    std::string presigned_url = s3_fs_->generate_presigned_url(test_file, 
expiration_secs, false);
+
+    std::cout << "Generated presigned URL: " << presigned_url << std::endl;
+
+    // Verify URL is not empty and contains expected components
+    EXPECT_FALSE(presigned_url.empty()) << "Presigned URL should not be empty";
+    EXPECT_NE(presigned_url.find("http"), std::string::npos) << "URL should 
start with http";
+
+    // For public endpoint test (if using OSS)
+    if (config_->get_provider() == "OSS") {
+        std::string presigned_url_public =
+                s3_fs_->generate_presigned_url(test_file, expiration_secs, 
true);
+        std::cout << "Generated presigned URL (public): " << 
presigned_url_public << std::endl;
+        EXPECT_FALSE(presigned_url_public.empty()) << "Public presigned URL 
should not be empty";
+    }
+
+    std::cout << "Generate presigned URL test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: Create directory (should be no-op but succeed)
+TEST_F(S3FileSystemTest, CreateDirectoryTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + "test_directory/";
+
+    // Create directory (should succeed but is essentially a no-op for S3)
+    Status status = s3_fs_->create_directory(test_dir);
+    ASSERT_TRUE(status.ok()) << "Failed to create directory: " << 
status.to_string();
+
+    std::cout << "Create directory test completed successfully!" << std::endl;
+}
+
+// Test: File size operation with head_object
+TEST_F(S3FileSystemTest, FileSizeHeadObjectTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + "size_test_file.txt";
+
+    // Create file with known size
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    std::string test_data = "This is a test string with known length: 12345";
+    size_t expected_size = test_data.size();
+
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    // Get file size using head_object
+    int64_t file_size = 0;
+    status = s3_fs_->file_size(test_file, &file_size);
+    ASSERT_TRUE(status.ok()) << "Failed to get file size: " << 
status.to_string();
+    EXPECT_EQ(file_size, expected_size) << "File size should match written 
data";
+
+    std::cout << "File size (head_object) test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: Exists check for non-existent file
+TEST_F(S3FileSystemTest, ExistsNonExistentFileTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string non_existent_file = global_test_prefix_ + 
"non_existent_file_12345.txt";
+
+    bool exists = true;
+    Status status = s3_fs_->exists(non_existent_file, &exists);
+    ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+    EXPECT_FALSE(exists) << "Non-existent file should not exist";
+
+    std::cout << "Exists non-existent file test completed successfully!" << 
std::endl;
+}
+
+// ==================== Rate Limiter Tests ====================
+
+// Test: S3 rate limiter for GET operations - open_file and read_at
+TEST_F(S3FileSystemTest, RateLimiterGetTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create a test file first
+    std::string test_file = global_test_prefix_ + "rate_limit_get_test.txt";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    // Write some data (about 1KB)
+    std::string test_data(1024, 'A');
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    // Set a very strict rate limit for GET operations to trigger error
+    // max_speed: 10 tokens per second
+    // max_burst: 10 tokens (bucket size)
+    // limit: 5 (allow only 5 requests total, the 6th will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 11);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
+
+    std::cout << "Rate limiter set: limit 5 total requests for GET operations" 
<< std::endl;
+
+    // First 5 read operations should succeed
+    for (int i = 0; i < 5; ++i) {
+        io::FileReaderSPtr reader;
+        status = s3_fs_->open_file(test_file, &reader);
+        ASSERT_TRUE(status.ok()) << "Failed to open file on attempt " << i + 1 
<< ": "
+                                 << status.to_string();
+
+        std::vector<char> buffer(1024);
+        size_t bytes_read = 0;
+        status = reader->read_at(0, Slice(buffer.data(), 1024), &bytes_read);
+        ASSERT_TRUE(status.ok()) << "Failed to read file on attempt " << i + 1 
<< ": "
+                                 << status.to_string();
+        std::cout << "Read attempt " << i + 1 << " succeeded" << std::endl;
+    }
+
+    // 6th read operation should fail due to rate limit
+    io::FileReaderSPtr reader;
+    status = s3_fs_->open_file(test_file, &reader);
+    if (status.ok()) {
+        std::vector<char> buffer(1024);
+        size_t bytes_read = 0;
+        status = reader->read_at(0, Slice(buffer.data(), 1024), &bytes_read);
+    }
+
+    EXPECT_FALSE(status.ok()) << "6th read should fail due to rate limit";
+    std::cout << "6th read failed as expected: " << status.to_string() << 
std::endl;
+
+    // Reset rate limiter to default (no limit) to avoid affecting other tests
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter GET test completed successfully!" << std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+}
+
+// Test: S3 rate limiter for PUT operations - trigger error with limit
+TEST_F(S3FileSystemTest, RateLimiterPutTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Set a very strict rate limit for PUT operations to trigger error
+    // max_speed: 1 token per second (1 QPS)
+    // max_burst: 1 token (bucket size)
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for PUT operations" 
<< std::endl;
+
+    // First two write operations should succeed
+    std::vector<std::string> test_files;
+    Status status;
+
+    for (int i = 0; i < 2; ++i) {
+        std::string test_file =
+                global_test_prefix_ + "rate_limit_put_test_" + 
std::to_string(i) + ".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file on attempt " << i + 
1 << ": "
+                                 << status.to_string();
+
+        std::string test_data = "Test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data on attempt " << i + 
1 << ": "
+                                 << status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer on attempt " << i 
+ 1 << ": "
+                                 << status.to_string();
+
+        std::cout << "Write attempt " << i + 1 << " succeeded" << std::endl;
+    }
+
+    // Third write operation should fail due to rate limit
+    std::string test_file_fail = global_test_prefix_ + 
"rate_limit_put_test_fail.txt";
+    io::FileWriterPtr writer;
+    status = s3_fs_->create_file(test_file_fail, &writer);
+    if (status.ok()) {
+        std::string test_data = "This should fail";
+        status = writer->append(test_data);
+        if (status.ok()) {
+            status = writer->close();
+        }
+    }
+
+    EXPECT_FALSE(status.ok()) << "Third write should fail due to rate limit";
+    std::cout << "Third write failed as expected: " << status.to_string() << 
std::endl;
+
+    // Reset rate limiter to default (no limit) to avoid affecting other tests
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT test completed successfully!" << std::endl;
+
+    // Cleanup successfully created files
+    for (const auto& file : test_files) {
+        status = s3_fs_->delete_file(file);
+        ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+    }
+}
+
+// Test: S3 rate limiter for GET operations - download
+TEST_F(S3FileSystemTest, RateLimiterGetDownloadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create test files first
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_file =
+                global_test_prefix_ + "rate_limit_download_test_" + 
std::to_string(i) + ".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "Download test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Set rate limit for GET operations
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 4);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for GET operations 
(download)"
+              << std::endl;
+
+    // First two download operations should succeed
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        std::string local_file = "/tmp/rate_limit_download_" + 
std::to_string(i) + ".txt";
+        status = s3_fs_->download(test_files[i], local_file);
+        ASSERT_TRUE(status.ok()) << "Failed to download file on attempt " << i 
+ 1 << ": "
+                                 << status.to_string();
+        std::cout << "Download attempt " << i + 1 << " succeeded" << std::endl;
+        std::remove(local_file.c_str());
+    }
+
+    // Third download operation should fail due to rate limit
+    std::string local_file_fail = "/tmp/rate_limit_download_fail.txt";
+    status = s3_fs_->download(test_files[2], local_file_fail);
+
+    EXPECT_FALSE(status.ok()) << "Third download should fail due to rate 
limit";
+    std::cout << "Third download failed as expected: " << status.to_string() 
<< std::endl;
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter GET download test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    for (const auto& file : test_files) {
+        status = s3_fs_->delete_file(file);
+        ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+    }
+}
+
+// Test: S3 rate limiter for PUT operations - multipart upload
+TEST_F(S3FileSystemTest, RateLimiterPutMultipartTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Set rate limit for PUT operations with higher limit for multipart
+    // Each chunk in multipart upload counts as one request
+    // limit: 3 (allow only 3 PUT requests, multipart with 6 chunks should 
fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 3);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 3 total requests for PUT operations 
(multipart)"
+              << std::endl;
+
+    std::string test_file = global_test_prefix_ + 
"rate_limit_multipart_test.dat";
+
+    // Create file writer
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    // Write large data to trigger multipart upload (>5MB for multipart)
+    // This will trigger multiple PUT requests and should fail
+    size_t chunk_size = 1024 * 1024; // 1MB
+    size_t num_chunks = 6;           // 6MB total, will trigger multipart
+    std::string chunk_data(chunk_size, 'A');
+
+    bool failed = false;
+    for (size_t i = 0; i < num_chunks; ++i) {
+        std::ranges::fill(chunk_data, 'A' + (i % 26));
+        status = writer->append(chunk_data);
+        if (!status.ok()) {
+            failed = true;
+            std::cout << "Multipart upload failed at chunk " << i
+                      << " as expected: " << status.to_string() << std::endl;
+            break;
+        }
+    }
+
+    if (!failed) {
+        status = writer->close();
+        if (!status.ok()) {
+            failed = true;
+            std::cout << "Multipart upload failed at close as expected: " << 
status.to_string()
+                      << std::endl;
+        }
+    }
+
+    EXPECT_TRUE(failed) << "Multipart upload should fail due to rate limit";
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT multipart test completed successfully!" << 
std::endl;
+
+    // Try to cleanup (may fail if file was not created)
+    std::ignore = s3_fs_->delete_file(test_file);
+}
+
+// Test: S3 rate limiter for PUT operations - upload
+TEST_F(S3FileSystemTest, RateLimiterPutUploadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create local test files
+    std::vector<std::string> local_files;
+    std::vector<std::string> remote_files;
+
+    for (int i = 0; i < 3; ++i) {
+        std::string local_file = "/tmp/rate_limit_upload_" + std::to_string(i) 
+ ".txt";
+        local_files.push_back(local_file);
+
+        std::ofstream ofs(local_file);
+        ASSERT_TRUE(ofs.is_open()) << "Failed to create local file";
+        ofs << "Upload test data " << i;
+        ofs.close();
+
+        remote_files.push_back(global_test_prefix_ + "rate_limit_upload_" + 
std::to_string(i) +
+                               ".txt");
+    }
+
+    // Set rate limit for PUT operations
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for PUT operations 
(upload)"
+              << std::endl;
+
+    // First two upload operations should succeed
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        status = s3_fs_->upload(local_files[i], remote_files[i]);
+        ASSERT_TRUE(status.ok()) << "Failed to upload file on attempt " << i + 
1 << ": "
+                                 << status.to_string();
+        std::cout << "Upload attempt " << i + 1 << " succeeded" << std::endl;
+    }
+
+    // Third upload operation should fail due to rate limit
+    status = s3_fs_->upload(local_files[2], remote_files[2]);
+
+    EXPECT_FALSE(status.ok()) << "Third upload should fail due to rate limit";
+    std::cout << "Third upload failed as expected: " << status.to_string() << 
std::endl;
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT upload test completed successfully!" << 
std::endl;
+
+    // Cleanup local files
+    for (const auto& file : local_files) {
+        std::remove(file.c_str());
+    }
+
+    // Cleanup remote files (only first 2 should exist)
+    for (int i = 0; i < 2; ++i) {
+        status = s3_fs_->delete_file(remote_files[i]);
+        ASSERT_TRUE(status.ok()) << "Failed to delete remote file: " << 
status.to_string();
+    }
+}
+
+// Test: S3 rate limiter for GET operations - head_object (file_size/exists)
+TEST_F(S3FileSystemTest, RateLimiterGetHeadObjectTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create test files first
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_file =
+                global_test_prefix_ + "rate_limit_head_test_" + 
std::to_string(i) + ".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "Head object test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Set rate limit for GET operations (head_object uses GET rate limiter)
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for GET operations 
(head_object)"
+              << std::endl;
+
+    // First two head operations should succeed
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        int64_t file_size = 0;
+        status = s3_fs_->file_size(test_files[i], &file_size);
+        ASSERT_TRUE(status.ok()) << "Failed to get file size on attempt " << i 
+ 1 << ": "
+                                 << status.to_string();
+        std::cout << "Head object attempt " << i + 1 << " succeeded, size: " 
<< file_size
+                  << std::endl;
+    }
+
+    // Third head operation should fail due to rate limit
+    bool exists = false;
+    status = s3_fs_->exists(test_files[2], &exists);
+
+    EXPECT_FALSE(status.ok()) << "Third head object should fail due to rate 
limit";
+    std::cout << "Third head object failed as expected: " << 
status.to_string() << std::endl;
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter GET head object test completed successfully!" 
<< std::endl;
+
+    // Cleanup
+    for (const auto& file : test_files) {
+        status = s3_fs_->delete_file(file);
+        ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << 
status.to_string();
+    }
+}
+
+// Test: S3 rate limiter for GET operations - list objects
+TEST_F(S3FileSystemTest, RateLimiterGetListTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create test directories with files
+    std::vector<std::string> test_dirs;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_dir =
+                global_test_prefix_ + "rate_limit_list_test_" + 
std::to_string(i) + "/";
+        test_dirs.push_back(test_dir);
+
+        // Create a file in each directory
+        std::string test_file = test_dir + "file.txt";
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "List test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Set rate limit for GET operations (list uses GET rate limiter)
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for GET operations 
(list)" << std::endl;
+
+    // First two list operations should succeed
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        bool exists = false;
+        std::vector<io::FileInfo> files;
+        status = s3_fs_->list(test_dirs[i], true, &files, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to list on attempt " << i + 1 << 
": "
+                                 << status.to_string();
+        std::cout << "List attempt " << i + 1 << " succeeded, found " << 
files.size() << " files"
+                  << std::endl;
+    }
+
+    // Third list operation should fail due to rate limit
+    bool exists = false;
+    std::vector<io::FileInfo> files;
+    status = s3_fs_->list(test_dirs[2], true, &files, &exists);
+
+    EXPECT_FALSE(status.ok()) << "Third list should fail due to rate limit";
+    std::cout << "Third list failed as expected: " << status.to_string() << 
std::endl;
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter GET list test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    for (const auto& dir : test_dirs) {
+        status = s3_fs_->delete_directory(dir);
+        ASSERT_TRUE(status.ok()) << "Failed to delete test directory: " << 
status.to_string();
+    }
+}
+
+// Test: S3 rate limiter for PUT operations - delete_file (uses PUT rate 
limiter)
+TEST_F(S3FileSystemTest, RateLimiterPutDeleteTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create test files first (without rate limit)
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_file =
+                global_test_prefix_ + "rate_limit_delete_test_" + 
std::to_string(i) + ".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "Delete test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Set rate limit for PUT operations (delete uses PUT rate limiter)
+    // limit: 2 (allow only 2 requests total, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for PUT operations 
(delete)"
+              << std::endl;
+
+    // First two delete operations should succeed
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        status = s3_fs_->delete_file(test_files[i]);
+        ASSERT_TRUE(status.ok()) << "Failed to delete file on attempt " << i + 
1 << ": "
+                                 << status.to_string();
+        std::cout << "Delete attempt " << i + 1 << " succeeded" << std::endl;
+    }
+
+    // Third delete operation should fail due to rate limit
+    status = s3_fs_->delete_file(test_files[2]);
+
+    EXPECT_FALSE(status.ok()) << "Third delete should fail due to rate limit";
+    std::cout << "Third delete failed as expected: " << status.to_string() << 
std::endl;
+
+    // Reset rate limiter to clean up the remaining file
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT delete test completed successfully!" << 
std::endl;
+
+    // Cleanup the file that failed to delete
+    status = s3_fs_->delete_file(test_files[2]);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining file: " << 
status.to_string();
+}
+
+// Test: S3 rate limiter for PUT operations - batch_delete (uses PUT rate 
limiter)
+TEST_F(S3FileSystemTest, RateLimiterPutBatchDeleteTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create multiple batches of test files
+    std::vector<std::vector<std::string>> file_batches;
+
+    for (int batch = 0; batch < 3; ++batch) {
+        std::vector<std::string> batch_files;
+        for (int i = 0; i < 3; ++i) {
+            std::string test_file = global_test_prefix_ + 
"rate_limit_batch_delete_batch_" +
+                                    std::to_string(batch) + "_file_" + 
std::to_string(i) + ".txt";
+            batch_files.push_back(test_file);
+
+            io::FileWriterPtr writer;
+            Status status = s3_fs_->create_file(test_file, &writer);
+            ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+            std::string test_data = "Batch delete test data";
+            status = writer->append(test_data);
+            ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+            status = writer->close();
+            ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+        }
+        file_batches.push_back(batch_files);
+    }
+
+    // Set rate limit for PUT operations (batch_delete uses PUT rate limiter)
+    // limit: 2 (allow only 2 batch delete requests, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for PUT operations 
(batch_delete)"
+              << std::endl;
+
+    // First two batch delete operations should succeed
+    Status status;
+    for (int batch = 0; batch < 2; ++batch) {
+        std::vector<io::Path> paths_to_delete;
+        for (const auto& file : file_batches[batch]) {
+            paths_to_delete.emplace_back(file);
+        }
+
+        status = s3_fs_->batch_delete(paths_to_delete);
+        ASSERT_TRUE(status.ok()) << "Failed to batch delete on attempt " << 
batch + 1 << ": "
+                                 << status.to_string();
+        std::cout << "Batch delete attempt " << batch + 1 << " succeeded, 
deleted "
+                  << paths_to_delete.size() << " files" << std::endl;
+    }
+
+    // Third batch delete operation should fail due to rate limit
+    std::vector<io::Path> paths_to_delete;
+    for (const auto& file : file_batches[2]) {
+        paths_to_delete.emplace_back(file);
+    }
+    status = s3_fs_->batch_delete(paths_to_delete);
+
+    EXPECT_FALSE(status.ok()) << "Third batch delete should fail due to rate 
limit";
+    std::cout << "Third batch delete failed as expected: " << 
status.to_string() << std::endl;
+
+    // Reset rate limiter to clean up remaining files
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT batch delete test completed successfully!" 
<< std::endl;
+
+    // Cleanup remaining files from failed batch
+    status = s3_fs_->batch_delete(paths_to_delete);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining files: " << 
status.to_string();
+}
+
+// Test: S3 rate limiter for GET operations - delete_directory with 
ListObjectsV2
+TEST_F(S3FileSystemTest, RateLimiterGetDeleteDirectoryListTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create multiple test directories with files
+    std::vector<std::string> test_dirs;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_dir =
+                global_test_prefix_ + "rate_limit_delete_dir_list_" + 
std::to_string(i) + "/";
+        test_dirs.push_back(test_dir);
+
+        // Create multiple files in each directory
+        for (int j = 0; j < 5; ++j) {
+            std::string test_file = test_dir + "file_" + std::to_string(j) + 
".txt";
+            io::FileWriterPtr writer;
+            Status status = s3_fs_->create_file(test_file, &writer);
+            ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+            std::string test_data = "Delete directory test data";
+            status = writer->append(test_data);
+            ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+            status = writer->close();
+            ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+        }
+    }
+
+    // Set rate limit for GET operations (ListObjectsV2 uses GET rate limiter)
+    // limit: 2 (allow only 2 ListObjectsV2 requests, the 3rd will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for GET operations 
(ListObjectsV2 in "
+                 "delete_directory)"
+              << std::endl;
+
+    // First two delete_directory operations should succeed
+    // Each delete_directory calls ListObjectsV2 (GET) and DeleteObjects (PUT)
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        status = s3_fs_->delete_directory(test_dirs[i]);
+        ASSERT_TRUE(status.ok()) << "Failed to delete directory on attempt " 
<< i + 1 << ": "
+                                 << status.to_string();
+        std::cout << "Delete directory attempt " << i + 1 << " succeeded" << 
std::endl;
+    }
+
+    // Third delete_directory operation should fail due to rate limit on 
ListObjectsV2
+    status = s3_fs_->delete_directory(test_dirs[2]);
+
+    EXPECT_FALSE(status.ok())
+            << "Third delete_directory should fail due to rate limit on 
ListObjectsV2";
+    std::cout << "Third delete_directory failed as expected (ListObjectsV2 
rate limit): "
+              << status.to_string() << std::endl;
+
+    // Reset rate limiter to clean up remaining directory
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter GET delete_directory (ListObjectsV2) test 
completed successfully!"
+              << std::endl;
+
+    // Cleanup remaining directory
+    status = s3_fs_->delete_directory(test_dirs[2]);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << 
status.to_string();
+}
+
+// Test: S3 rate limiter for PUT operations - multipart upload with UploadPart 
failure
+TEST_F(S3FileSystemTest, RateLimiterPutMultipartUploadPartFailureTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Set a very strict rate limit for PUT operations
+    // limit: 1 (allow only 1 PUT request: CreateMultipartUpload will succeed, 
but UploadPart will fail)
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 1);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 1 total request for PUT operations 
(UploadPart will fail)"
+              << std::endl;
+
+    std::string test_file = global_test_prefix_ + 
"rate_limit_upload_part_test.dat";
+
+    // Create file writer
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    // Write large data to trigger multipart upload (>5MB for multipart)
+    // This will trigger CreateMultipartUpload (1st PUT, succeeds) and 
UploadPart (2nd PUT, should fail)
+    size_t chunk_size = 1024 * 1024; // 1MB
+    size_t num_chunks = 6;           // 6MB total, will trigger multipart
+    std::string chunk_data(chunk_size, 'A');
+
+    std::cout << "Writing " << (chunk_size * num_chunks) << " bytes to trigger 
multipart upload"
+              << std::endl;
+
+    bool write_failed = false;
+    for (size_t i = 0; i < num_chunks; ++i) {
+        std::ranges::fill(chunk_data, 'A' + (i % 26));
+        status = writer->append(chunk_data);
+        if (!status.ok()) {
+            write_failed = true;
+            std::cout << "Write failed at chunk " << i << " as expected: " << 
status.to_string()
+                      << std::endl;
+            break;
+        }
+    }
+
+    if (!write_failed) {
+        // If write succeeded, close should fail
+        status = writer->close();
+        if (!status.ok()) {
+            write_failed = true;
+            std::cout << "Close failed as expected (UploadPart rate limit): " 
<< status.to_string()
+                      << std::endl;
+        }
+    }
+
+    EXPECT_TRUE(write_failed) << "Multipart upload should fail due to 
UploadPart rate limit";
+
+    // Verify the error message contains information about UploadPart failure
+    if (write_failed) {
+        std::string error_msg = status.to_string();
+        // The error should be about upload failure
+        std::cout << "Verified UploadPart failure with error: " << error_msg 
<< std::endl;
+    }
+
+    // Reset rate limiter
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT multipart UploadPart failure test completed 
successfully!"
+              << std::endl;
+
+    // Try to cleanup (may fail if file was not created)
+    std::ignore = s3_fs_->delete_file(test_file);
+}
+
+// Test: S3 DeleteObjects with partial failure - some objects fail to delete
+TEST_F(S3FileSystemTest, DeleteDirectoryPartialFailureTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + 
"delete_partial_failure_test/";
+    std::cout << "Test directory path: " << test_dir << std::endl;
+
+    // Create multiple test files
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 5; ++i) {
+        std::string test_file = test_dir + "file_" + std::to_string(i) + 
".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "Test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Set up sync point to simulate partial delete failure
+    // When DeleteObjects is called, we'll inject errors into the result
+    SyncPoint::get_instance()->set_call_back(
+            "s3_obj_storage_client::delete_objects_recursively", 
[](std::vector<std::any>&& args) {
+                // The callback receives delete_outcome as argument
+                // delete_outcome is of type: 
Aws::Utils::Outcome<DeleteObjectsResult, S3Error>*
+                using DeleteObjectsOutcome =
+                        
Aws::Utils::Outcome<Aws::S3::Model::DeleteObjectsResult, Aws::S3::S3Error>;
+                auto* delete_outcome = 
std::any_cast<DeleteObjectsOutcome*>(args[0]);
+
+                // Create a mock error for one of the objects
+                Aws::S3::Model::Error error;
+                error.SetKey("file_1.txt");
+                error.SetCode("AccessDenied");
+                error.SetMessage("Simulated partial delete failure");
+
+                // Get the mutable result and add error
+                // Note: We need to create a new result with errors
+                auto& result = 
const_cast<Aws::S3::Model::DeleteObjectsResult&>(
+                        delete_outcome->GetResult());
+                Aws::Vector<Aws::S3::Model::Error> errors;
+                errors.push_back(error);
+                result.SetErrors(errors);
+            });
+
+    // Enable sync point processing
+    SyncPoint::get_instance()->enable_processing();
+
+    // Try to delete directory - should fail due to partial delete failure
+    Status status = s3_fs_->delete_directory(test_dir);
+
+    // The delete should fail because some objects failed to delete
+    EXPECT_FALSE(status.ok()) << "Delete directory should fail due to partial 
delete failure";
+    std::cout << "Delete directory failed as expected with partial failure: " 
<< status.to_string()
+              << std::endl;
+
+    // Verify error message contains information about the failed object
+    std::string error_msg = status.to_string();
+    std::cout << "Error message: " << error_msg << std::endl;
+
+    // Disable sync point processing and clear callbacks
+    SyncPoint::get_instance()->disable_processing();
+    SyncPoint::get_instance()->clear_all_call_backs();
+
+    std::cout << "Delete directory partial failure test completed 
successfully!" << std::endl;
+
+    // Cleanup - now without sync point, it should succeed
+    status = s3_fs_->delete_directory(test_dir);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup test directory: " << 
status.to_string();
+}
+
+// Test: S3 batch_delete (delete_objects) with partial failure - some objects 
fail to delete
+TEST_F(S3FileSystemTest, BatchDeletePartialFailureTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_dir = global_test_prefix_ + 
"batch_delete_partial_failure_test/";
+    std::cout << "Test directory path: " << test_dir << std::endl;
+
+    // Create multiple test files
+    std::vector<std::string> test_files;
+    for (int i = 0; i < 5; ++i) {
+        std::string test_file = test_dir + "file_" + std::to_string(i) + 
".txt";
+        test_files.push_back(test_file);
+
+        io::FileWriterPtr writer;
+        Status status = s3_fs_->create_file(test_file, &writer);
+        ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+        std::string test_data = "Batch delete test data " + std::to_string(i);
+        status = writer->append(test_data);
+        ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+        status = writer->close();
+        ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+    }
+
+    // Verify all files exist
+    for (const auto& file_path : test_files) {
+        bool exists = false;
+        Status status = s3_fs_->exists(file_path, &exists);
+        ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << 
status.to_string();
+        EXPECT_TRUE(exists) << "File should exist: " << file_path;
+    }
+
+    // Set up sync point to simulate partial delete failure in batch_delete
+    // When DeleteObjects is called via batch_delete, we'll inject errors into 
the result
+    SyncPoint::get_instance()->set_call_back(
+            "s3_obj_storage_client::delete_objects", 
[](std::vector<std::any>&& args) {
+                // The callback receives delete_outcome as argument
+                // delete_outcome is of type: 
Aws::Utils::Outcome<DeleteObjectsResult, S3Error>*
+                using DeleteObjectsOutcome =
+                        
Aws::Utils::Outcome<Aws::S3::Model::DeleteObjectsResult, Aws::S3::S3Error>;
+                auto* delete_outcome = 
std::any_cast<DeleteObjectsOutcome*>(args[0]);
+
+                // Create mock errors for some of the objects
+                Aws::S3::Model::Error error1;
+                error1.SetKey("file_2.txt");
+                error1.SetCode("AccessDenied");
+                error1.SetMessage("Simulated batch delete partial failure for 
file_2");
+
+                Aws::S3::Model::Error error2;
+                error2.SetKey("file_4.txt");
+                error2.SetCode("InternalError");
+                error2.SetMessage("Simulated batch delete partial failure for 
file_4");
+
+                // Get the mutable result and add errors
+                auto& result = 
const_cast<Aws::S3::Model::DeleteObjectsResult&>(
+                        delete_outcome->GetResult());
+                Aws::Vector<Aws::S3::Model::Error> errors;
+                errors.push_back(error1);
+                errors.push_back(error2);
+                result.SetErrors(errors);
+            });
+
+    // Enable sync point processing
+    SyncPoint::get_instance()->enable_processing();
+
+    // Try to batch delete files - should fail due to partial delete failure
+    std::vector<io::Path> paths_to_delete;
+    for (const auto& file_path : test_files) {
+        paths_to_delete.emplace_back(file_path);
+    }
+
+    Status status = s3_fs_->batch_delete(paths_to_delete);
+
+    // The batch_delete should fail because some objects failed to delete
+    EXPECT_FALSE(status.ok()) << "Batch delete should fail due to partial 
delete failure";
+    std::cout << "Batch delete failed as expected with partial failure: " << 
status.to_string()
+              << std::endl;
+
+    // Verify error message contains information about the failed object
+    std::string error_msg = status.to_string();
+    std::cout << "Error message: " << error_msg << std::endl;
+    // The error should mention one of the failed files
+    EXPECT_TRUE(error_msg.find("file_2") != std::string::npos ||
+                error_msg.find("file_4") != std::string::npos)
+            << "Error message should mention failed file";
+
+    // Disable sync point processing and clear callbacks
+    SyncPoint::get_instance()->disable_processing();
+    SyncPoint::get_instance()->clear_all_call_backs();
+
+    std::cout << "Batch delete partial failure test completed successfully!" 
<< std::endl;
+
+    // Cleanup - now without sync point, it should succeed
+    status = s3_fs_->batch_delete(paths_to_delete);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup test files: " << 
status.to_string();
+}
+
+// Test: S3 get_object with incomplete read - bytes_read != size_return
+TEST_F(S3FileSystemTest, GetObjectIncompleteReadTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + "incomplete_read_test.txt";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Create a test file with known content
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+    std::string test_data =
+            "This is test data for incomplete read simulation. "
+            "The content should be long enough to test partial reads.";
+    status = writer->append(test_data);
+    ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
+
+    status = writer->close();
+    ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+
+    std::cout << "Created test file with " << test_data.size() << " bytes" << 
std::endl;
+
+    // Open file for reading
+    io::FileReaderSPtr reader;
+    status = s3_fs_->open_file(test_file, &reader);
+    ASSERT_TRUE(status.ok()) << "Failed to open file: " << status.to_string();
+    ASSERT_NE(reader, nullptr);
+
+    // Set up sync point to simulate incomplete read
+    // When get_object is called, we'll modify size_return to be less than 
bytes_read
+    SyncPoint::get_instance()->set_call_back(
+            "s3_obj_storage_client::get_object", [](std::vector<std::any>&& 
args) {
+                // The callback receives size_return as argument
+                auto* size_return = std::any_cast<size_t*>(args[0]);
+
+                // Simulate incomplete read by reducing the returned size
+                // For example, if we requested 50 bytes but only got 30
+                size_t original_size = *size_return;
+                *size_return = original_size / 2; // Return only half of what 
was requested
+
+                std::cout << "SyncPoint: Modified size_return from " << 
original_size << " to "
+                          << *size_return << std::endl;
+            });
+
+    // Enable sync point processing
+    SyncPoint::get_instance()->enable_processing();
+
+    // Try to read from file - should fail due to incomplete read
+    size_t read_size = 50;
+    std::vector<char> buffer(read_size);
+    size_t bytes_read = 0;
+    status = reader->read_at(0, Slice(buffer.data(), read_size), &bytes_read);
+
+    // The read should fail because size_return != bytes_read
+    EXPECT_FALSE(status.ok()) << "Read should fail due to incomplete read";
+    std::cout << "Read failed as expected with incomplete read: " << 
status.to_string()
+              << std::endl;
+
+    // Verify error message contains information about the mismatch
+    std::string error_msg = status.to_string();
+    std::cout << "Error message: " << error_msg << std::endl;
+    EXPECT_TRUE(error_msg.find("bytes read") != std::string::npos ||
+                error_msg.find("bytes req") != std::string::npos)
+            << "Error message should mention byte count mismatch";
+
+    // Disable sync point processing and clear callbacks
+    SyncPoint::get_instance()->disable_processing();
+    SyncPoint::get_instance()->clear_all_call_backs();
+
+    std::cout << "Get object incomplete read test completed successfully!" << 
std::endl;
+
+    // Cleanup
+    status = s3_fs_->delete_file(test_file);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup test file: " << 
status.to_string();
+}
+
+// Test: S3 rate limiter for PUT operations - delete_directory with 
DeleteObjects
+TEST_F(S3FileSystemTest, RateLimiterPutDeleteDirectoryDeleteObjectsTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    // Save original config value
+    bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
+
+    // Enable S3 rate limiter for this test
+    config::enable_s3_rate_limiter = true;
+
+    // Create multiple test directories with files
+    std::vector<std::string> test_dirs;
+    for (int i = 0; i < 3; ++i) {
+        std::string test_dir =
+                global_test_prefix_ + "rate_limit_delete_dir_objects_" + 
std::to_string(i) + "/";
+        test_dirs.push_back(test_dir);
+
+        // Create multiple files in each directory
+        for (int j = 0; j < 5; ++j) {
+            std::string test_file = test_dir + "file_" + std::to_string(j) + 
".txt";
+            io::FileWriterPtr writer;
+            Status status = s3_fs_->create_file(test_file, &writer);
+            ASSERT_TRUE(status.ok()) << "Failed to create file: " << 
status.to_string();
+
+            std::string test_data = "Delete directory test data";
+            status = writer->append(test_data);
+            ASSERT_TRUE(status.ok()) << "Failed to write data: " << 
status.to_string();
+
+            status = writer->close();
+            ASSERT_TRUE(status.ok()) << "Failed to close writer: " << 
status.to_string();
+        }
+    }
+
+    // Set rate limit for PUT operations (DeleteObjects uses PUT rate limiter)
+    // limit: 2 (allow only 2 DeleteObjects requests, the 3rd will fail)
+    // Note: We need to account for the file creation PUT requests above
+    // So we set limit to allow those + 2 DeleteObjects calls
+    int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
+    ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
+
+    std::cout << "Rate limiter set: limit 2 total requests for PUT operations 
(DeleteObjects in "
+                 "delete_directory)"
+              << std::endl;
+
+    // First two delete_directory operations should succeed
+    // Each delete_directory calls ListObjectsV2 (GET) and DeleteObjects (PUT)
+    Status status;
+    for (int i = 0; i < 2; ++i) {
+        status = s3_fs_->delete_directory(test_dirs[i]);
+        ASSERT_TRUE(status.ok()) << "Failed to delete directory on attempt " 
<< i + 1 << ": "
+                                 << status.to_string();
+        std::cout << "Delete directory attempt " << i + 1 << " succeeded" << 
std::endl;
+    }
+
+    // Third delete_directory operation should fail due to rate limit on 
DeleteObjects
+    status = s3_fs_->delete_directory(test_dirs[2]);
+
+    EXPECT_FALSE(status.ok())
+            << "Third delete_directory should fail due to rate limit on 
DeleteObjects";
+    std::cout << "Third delete_directory failed as expected (DeleteObjects 
rate limit): "
+              << status.to_string() << std::endl;
+
+    // Reset rate limiter to clean up remaining directory
+    reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
+    reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
+
+    // Restore original config
+    config::enable_s3_rate_limiter = original_enable_rate_limiter;
+
+    std::cout << "Rate limiter PUT delete_directory (DeleteObjects) test 
completed successfully!"
+              << std::endl;
+
+    // Cleanup remaining directory
+    status = s3_fs_->delete_directory(test_dirs[2]);
+    ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << 
status.to_string();
+}
+
+// Test: S3 CreateMultipartUpload failure - simulates error when initiating 
multipart upload
+TEST_F(S3FileSystemTest, CreateMultipartUploadFailureTest) {
+    ASSERT_NE(s3_fs_, nullptr);
+
+    std::string test_file = global_test_prefix_ + 
"create_multipart_upload_failure_test.dat";
+    std::cout << "Test file path: " << test_file << std::endl;
+
+    // Set up sync point to simulate CreateMultipartUpload failure
+    SyncPoint::get_instance()->set_call_back("s3_file_writer::_open", 
[](std::vector<std::any>&&
+                                                                               
  args) {
+        // The callback receives create_multipart_upload_outcome as argument
+        using CreateMultipartUploadOutcome =
+                
Aws::Utils::Outcome<Aws::S3::Model::CreateMultipartUploadResult, 
Aws::S3::S3Error>;
+        auto* outcome = std::any_cast<CreateMultipartUploadOutcome*>(args[0]);
+
+        // Create a mock S3 error to simulate CreateMultipartUpload failure
+        Aws::S3::S3Error error;
+        error.SetResponseCode(Aws::Http::HttpResponseCode::FORBIDDEN);
+        error.SetExceptionName("AccessDenied");
+        error.SetMessage("Simulated CreateMultipartUpload failure - Access 
Denied");
+        error.SetRequestId("test-request-id-12345");
+
+        // Replace the successful outcome with a failure outcome
+        *outcome = CreateMultipartUploadOutcome(error);
+
+        std::cout << "SyncPoint: Injected CreateMultipartUpload failure" << 
std::endl;
+    });
+
+    // Enable sync point processing
+    SyncPoint::get_instance()->enable_processing();
+
+    // Try to create a large file that will trigger multipart upload
+    io::FileWriterPtr writer;
+    Status status = s3_fs_->create_file(test_file, &writer);
+    ASSERT_TRUE(status.ok()) << "Failed to create file writer: " << 
status.to_string();
+
+    // Write large data to trigger multipart upload (>5MB for multipart)
+    size_t chunk_size = 1024 * 1024; // 1MB
+    size_t num_chunks = 6;           // 6MB total, will trigger multipart
+    std::string chunk_data(chunk_size, 'A');
+
+    std::cout << "Writing " << (chunk_size * num_chunks)
+              << " bytes to trigger CreateMultipartUpload" << std::endl;
+
+    bool write_failed = false;
+    for (size_t i = 0; i < num_chunks; ++i) {
+        std::ranges::fill(chunk_data, 'A' + (i % 26));
+        status = writer->append(chunk_data);
+        if (!status.ok()) {
+            write_failed = true;
+            std::cout << "Write failed at chunk " << i << " as expected: " << 
status.to_string()
+                      << std::endl;
+            break;
+        }
+    }
+
+    if (!write_failed) {
+        // If write succeeded, close should fail
+        status = writer->close();
+        if (!status.ok()) {
+            write_failed = true;
+            std::cout << "Close failed as expected (CreateMultipartUpload 
failure): "
+                      << status.to_string() << std::endl;
+        }
+    }
+
+    EXPECT_TRUE(write_failed)
+            << "Multipart upload should fail due to CreateMultipartUpload 
failure";
+
+    // Verify the error message contains information about 
CreateMultipartUpload failure
+    if (write_failed) {
+        std::string error_msg = status.to_string();
+        std::cout << "Verified CreateMultipartUpload failure with error: " << 
error_msg
+                  << std::endl;
+
+        // Check for expected error indicators
+        bool has_expected_error = error_msg.find("CreateMultipartUpload") != 
std::string::npos ||
+                                  error_msg.find("AccessDenied") != 
std::string::npos ||
+                                  error_msg.find("Access Denied") != 
std::string::npos;
+
+        EXPECT_TRUE(has_expected_error)
+                << "Error message should mention CreateMultipartUpload or 
AccessDenied failure";
+    }
+
+    // Disable sync point processing and clear callbacks
+    SyncPoint::get_instance()->disable_processing();
+    SyncPoint::get_instance()->clear_all_call_backs();
+
+    std::cout << "CreateMultipartUpload failure test completed successfully!" 
<< std::endl;
+
+    // Try to cleanup (file should not exist since upload failed)
+    std::ignore = s3_fs_->delete_file(test_file);
+}
+
+} // namespace doris
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 3d550b2a41c..1b600e7259f 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -181,7 +181,7 @@ if [[ "_${DENABLE_CLANG_COVERAGE}" == "_ON" ]]; then
     echo "export DORIS_TOOLCHAIN=clang" >>custom_env.sh
 fi
 
-if [[ -n "${DISABLE_BUILD_AZURE}" ]]; then
+if [[ "$(echo "${DISABLE_BUILD_AZURE}" | tr '[:lower:]' '[:upper:]')" == "ON" 
]]; then
     BUILD_AZURE='OFF'
 fi
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to